| OLD | NEW |
| (Empty) |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/common/handle_watcher.h" | |
| 6 | |
| 7 #include <map> | |
| 8 | |
| 9 #include "base/atomic_sequence_num.h" | |
| 10 #include "base/bind.h" | |
| 11 #include "base/lazy_instance.h" | |
| 12 #include "base/logging.h" | |
| 13 #include "base/macros.h" | |
| 14 #include "base/memory/singleton.h" | |
| 15 #include "base/memory/weak_ptr.h" | |
| 16 #include "base/message_loop/message_loop.h" | |
| 17 #include "base/message_loop/message_loop_proxy.h" | |
| 18 #include "base/synchronization/lock.h" | |
| 19 #include "base/synchronization/waitable_event.h" | |
| 20 #include "base/threading/thread.h" | |
| 21 #include "base/threading/thread_restrictions.h" | |
| 22 #include "base/time/time.h" | |
| 23 #include "mojo/common/message_pump_mojo.h" | |
| 24 #include "mojo/common/message_pump_mojo_handler.h" | |
| 25 #include "mojo/common/time_helper.h" | |
| 26 | |
| 27 namespace mojo { | |
| 28 namespace common { | |
| 29 | |
| 30 typedef int WatcherID; | |
| 31 | |
| 32 namespace { | |
| 33 | |
| 34 const char kWatcherThreadName[] = "handle-watcher-thread"; | |
| 35 | |
| 36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { | |
| 37 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : | |
| 38 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); | |
| 39 } | |
| 40 | |
| 41 // Tracks the data for a single call to Start(). | |
| 42 struct WatchData { | |
| 43 WatchData() | |
| 44 : id(0), | |
| 45 handle_signals(MOJO_HANDLE_SIGNAL_NONE), | |
| 46 message_loop(NULL) {} | |
| 47 | |
| 48 WatcherID id; | |
| 49 Handle handle; | |
| 50 MojoHandleSignals handle_signals; | |
| 51 base::TimeTicks deadline; | |
| 52 base::Callback<void(MojoResult)> callback; | |
| 53 scoped_refptr<base::MessageLoopProxy> message_loop; | |
| 54 }; | |
| 55 | |
| 56 // WatcherBackend -------------------------------------------------------------- | |
| 57 | |
| 58 // WatcherBackend is responsible for managing the requests and interacting with | |
| 59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the | |
| 60 // thread WatcherThreadManager creates. | |
| 61 class WatcherBackend : public MessagePumpMojoHandler { | |
| 62 public: | |
| 63 WatcherBackend(); | |
| 64 ~WatcherBackend() override; | |
| 65 | |
| 66 void StartWatching(const WatchData& data); | |
| 67 | |
| 68 // Cancels a previously scheduled request to start a watch. | |
| 69 void StopWatching(WatcherID watcher_id); | |
| 70 | |
| 71 private: | |
| 72 typedef std::map<Handle, WatchData> HandleToWatchDataMap; | |
| 73 | |
| 74 // Invoked when a handle needs to be removed and notified. | |
| 75 void RemoveAndNotify(const Handle& handle, MojoResult result); | |
| 76 | |
| 77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found | |
| 78 // and sets |handle| to the Handle. Returns false if not a known id. | |
| 79 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; | |
| 80 | |
| 81 // MessagePumpMojoHandler overrides: | |
| 82 void OnHandleReady(const Handle& handle) override; | |
| 83 void OnHandleError(const Handle& handle, MojoResult result) override; | |
| 84 | |
| 85 // Maps from assigned id to WatchData. | |
| 86 HandleToWatchDataMap handle_to_data_; | |
| 87 | |
| 88 DISALLOW_COPY_AND_ASSIGN(WatcherBackend); | |
| 89 }; | |
| 90 | |
| 91 WatcherBackend::WatcherBackend() { | |
| 92 } | |
| 93 | |
| 94 WatcherBackend::~WatcherBackend() { | |
| 95 } | |
| 96 | |
| 97 void WatcherBackend::StartWatching(const WatchData& data) { | |
| 98 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); | |
| 99 | |
| 100 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); | |
| 101 | |
| 102 handle_to_data_[data.handle] = data; | |
| 103 MessagePumpMojo::current()->AddHandler(this, data.handle, | |
| 104 data.handle_signals, | |
| 105 data.deadline); | |
| 106 } | |
| 107 | |
| 108 void WatcherBackend::StopWatching(WatcherID watcher_id) { | |
| 109 // Because of the thread hop it is entirely possible to get here and not | |
| 110 // have a valid handle registered for |watcher_id|. | |
| 111 Handle handle; | |
| 112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { | |
| 113 handle_to_data_.erase(handle); | |
| 114 MessagePumpMojo::current()->RemoveHandler(handle); | |
| 115 } | |
| 116 } | |
| 117 | |
| 118 void WatcherBackend::RemoveAndNotify(const Handle& handle, | |
| 119 MojoResult result) { | |
| 120 if (handle_to_data_.count(handle) == 0) | |
| 121 return; | |
| 122 | |
| 123 const WatchData data(handle_to_data_[handle]); | |
| 124 handle_to_data_.erase(handle); | |
| 125 MessagePumpMojo::current()->RemoveHandler(handle); | |
| 126 | |
| 127 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); | |
| 128 } | |
| 129 | |
| 130 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, | |
| 131 Handle* handle) const { | |
| 132 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); | |
| 133 i != handle_to_data_.end(); ++i) { | |
| 134 if (i->second.id == watcher_id) { | |
| 135 *handle = i->second.handle; | |
| 136 return true; | |
| 137 } | |
| 138 } | |
| 139 return false; | |
| 140 } | |
| 141 | |
| 142 void WatcherBackend::OnHandleReady(const Handle& handle) { | |
| 143 RemoveAndNotify(handle, MOJO_RESULT_OK); | |
| 144 } | |
| 145 | |
| 146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { | |
| 147 RemoveAndNotify(handle, result); | |
| 148 } | |
| 149 | |
| 150 // WatcherThreadManager -------------------------------------------------------- | |
| 151 | |
| 152 // WatcherThreadManager manages the background thread that listens for handles | |
| 153 // to be ready. All requests are handled by WatcherBackend. | |
| 154 } // namespace | |
| 155 | |
| 156 class WatcherThreadManager { | |
| 157 public: | |
| 158 ~WatcherThreadManager(); | |
| 159 | |
| 160 // Returns the shared instance. | |
| 161 static WatcherThreadManager* GetInstance(); | |
| 162 | |
| 163 // Starts watching the requested handle. Returns a unique ID that is used to | |
| 164 // stop watching the handle. When the handle is ready |callback| is notified | |
| 165 // on the thread StartWatching() was invoked on. | |
| 166 // This may be invoked on any thread. | |
| 167 WatcherID StartWatching(const Handle& handle, | |
| 168 MojoHandleSignals handle_signals, | |
| 169 base::TimeTicks deadline, | |
| 170 const base::Callback<void(MojoResult)>& callback); | |
| 171 | |
| 172 // Stops watching a handle. | |
| 173 // This may be invoked on any thread. | |
| 174 void StopWatching(WatcherID watcher_id); | |
| 175 | |
| 176 private: | |
| 177 enum RequestType { | |
| 178 REQUEST_START, | |
| 179 REQUEST_STOP, | |
| 180 }; | |
| 181 | |
| 182 // See description of |requests_| for details. | |
| 183 struct RequestData { | |
| 184 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} | |
| 185 | |
| 186 RequestType type; | |
| 187 WatchData start_data; | |
| 188 WatcherID stop_id; | |
| 189 base::WaitableEvent* stop_event; | |
| 190 }; | |
| 191 | |
| 192 typedef std::vector<RequestData> Requests; | |
| 193 | |
| 194 friend struct DefaultSingletonTraits<WatcherThreadManager>; | |
| 195 | |
| 196 WatcherThreadManager(); | |
| 197 | |
| 198 // Schedules a request on the background thread. See |requests_| for details. | |
| 199 void AddRequest(const RequestData& data); | |
| 200 | |
| 201 // Processes requests added to |requests_|. This is invoked on the backend | |
| 202 // thread. | |
| 203 void ProcessRequestsOnBackendThread(); | |
| 204 | |
| 205 base::Thread thread_; | |
| 206 | |
| 207 base::AtomicSequenceNumber watcher_id_generator_; | |
| 208 | |
| 209 WatcherBackend backend_; | |
| 210 | |
| 211 // Protects |requests_|. | |
| 212 base::Lock lock_; | |
| 213 | |
| 214 // Start/Stop result in adding a RequestData to |requests_| (protected by | |
| 215 // |lock_|). When the background thread wakes up it processes the requests. | |
| 216 Requests requests_; | |
| 217 | |
| 218 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); | |
| 219 }; | |
| 220 | |
| 221 WatcherThreadManager::~WatcherThreadManager() { | |
| 222 thread_.Stop(); | |
| 223 } | |
| 224 | |
| 225 WatcherThreadManager* WatcherThreadManager::GetInstance() { | |
| 226 return Singleton<WatcherThreadManager>::get(); | |
| 227 } | |
| 228 | |
| 229 WatcherID WatcherThreadManager::StartWatching( | |
| 230 const Handle& handle, | |
| 231 MojoHandleSignals handle_signals, | |
| 232 base::TimeTicks deadline, | |
| 233 const base::Callback<void(MojoResult)>& callback) { | |
| 234 RequestData request_data; | |
| 235 request_data.type = REQUEST_START; | |
| 236 request_data.start_data.id = watcher_id_generator_.GetNext(); | |
| 237 request_data.start_data.handle = handle; | |
| 238 request_data.start_data.callback = callback; | |
| 239 request_data.start_data.handle_signals = handle_signals; | |
| 240 request_data.start_data.deadline = deadline; | |
| 241 request_data.start_data.message_loop = base::MessageLoopProxy::current(); | |
| 242 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), | |
| 243 request_data.start_data.message_loop.get()); | |
| 244 AddRequest(request_data); | |
| 245 return request_data.start_data.id; | |
| 246 } | |
| 247 | |
| 248 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { | |
| 249 // Handle the case of StartWatching() followed by StopWatching() before | |
| 250 // |thread_| woke up. | |
| 251 { | |
| 252 base::AutoLock auto_lock(lock_); | |
| 253 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { | |
| 254 if (i->type == REQUEST_START && i->start_data.id == watcher_id) { | |
| 255 // Watcher ids are not reused, so if we find it we can stop. | |
| 256 requests_.erase(i); | |
| 257 return; | |
| 258 } | |
| 259 } | |
| 260 } | |
| 261 | |
| 262 base::ThreadRestrictions::ScopedAllowWait allow_wait; | |
| 263 base::WaitableEvent event(true, false); | |
| 264 RequestData request_data; | |
| 265 request_data.type = REQUEST_STOP; | |
| 266 request_data.stop_id = watcher_id; | |
| 267 request_data.stop_event = &event; | |
| 268 AddRequest(request_data); | |
| 269 | |
| 270 // We need to block until the handle is actually removed. | |
| 271 event.Wait(); | |
| 272 } | |
| 273 | |
| 274 void WatcherThreadManager::AddRequest(const RequestData& data) { | |
| 275 { | |
| 276 base::AutoLock auto_lock(lock_); | |
| 277 const bool was_empty = requests_.empty(); | |
| 278 requests_.push_back(data); | |
| 279 if (!was_empty) | |
| 280 return; | |
| 281 } | |
| 282 // We own |thread_|, so it's safe to use Unretained() here. | |
| 283 thread_.message_loop()->PostTask( | |
| 284 FROM_HERE, | |
| 285 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, | |
| 286 base::Unretained(this))); | |
| 287 } | |
| 288 | |
| 289 void WatcherThreadManager::ProcessRequestsOnBackendThread() { | |
| 290 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); | |
| 291 | |
| 292 Requests requests; | |
| 293 { | |
| 294 base::AutoLock auto_lock(lock_); | |
| 295 requests_.swap(requests); | |
| 296 } | |
| 297 for (size_t i = 0; i < requests.size(); ++i) { | |
| 298 if (requests[i].type == REQUEST_START) { | |
| 299 backend_.StartWatching(requests[i].start_data); | |
| 300 } else { | |
| 301 backend_.StopWatching(requests[i].stop_id); | |
| 302 requests[i].stop_event->Signal(); | |
| 303 } | |
| 304 } | |
| 305 } | |
| 306 | |
| 307 WatcherThreadManager::WatcherThreadManager() | |
| 308 : thread_(kWatcherThreadName) { | |
| 309 base::Thread::Options thread_options; | |
| 310 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); | |
| 311 thread_.StartWithOptions(thread_options); | |
| 312 } | |
| 313 | |
| 314 // HandleWatcher::StateBase and subclasses ------------------------------------- | |
| 315 | |
| 316 // The base class of HandleWatcher's state. Owns the user's callback and | |
| 317 // monitors the current thread's MessageLoop to know when to force the callback | |
| 318 // to run (with an error) even though the pipe hasn't been signaled yet. | |
| 319 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { | |
| 320 public: | |
| 321 StateBase(HandleWatcher* watcher, | |
| 322 const base::Callback<void(MojoResult)>& callback) | |
| 323 : watcher_(watcher), | |
| 324 callback_(callback), | |
| 325 got_ready_(false) { | |
| 326 base::MessageLoop::current()->AddDestructionObserver(this); | |
| 327 } | |
| 328 | |
| 329 ~StateBase() override { | |
| 330 base::MessageLoop::current()->RemoveDestructionObserver(this); | |
| 331 } | |
| 332 | |
| 333 protected: | |
| 334 void NotifyHandleReady(MojoResult result) { | |
| 335 got_ready_ = true; | |
| 336 NotifyAndDestroy(result); | |
| 337 } | |
| 338 | |
| 339 bool got_ready() const { return got_ready_; } | |
| 340 | |
| 341 private: | |
| 342 void WillDestroyCurrentMessageLoop() override { | |
| 343 // The current thread is exiting. Simulate a watch error. | |
| 344 NotifyAndDestroy(MOJO_RESULT_ABORTED); | |
| 345 } | |
| 346 | |
| 347 void NotifyAndDestroy(MojoResult result) { | |
| 348 base::Callback<void(MojoResult)> callback = callback_; | |
| 349 watcher_->Stop(); // Destroys |this|. | |
| 350 | |
| 351 callback.Run(result); | |
| 352 } | |
| 353 | |
| 354 HandleWatcher* watcher_; | |
| 355 base::Callback<void(MojoResult)> callback_; | |
| 356 | |
| 357 // Have we been notified that the handle is ready? | |
| 358 bool got_ready_; | |
| 359 | |
| 360 DISALLOW_COPY_AND_ASSIGN(StateBase); | |
| 361 }; | |
| 362 | |
| 363 // If the thread on which HandleWatcher is used runs MessagePumpMojo, | |
| 364 // SameThreadWatchingState is used to directly watch the handle on the same | |
| 365 // thread. | |
| 366 class HandleWatcher::SameThreadWatchingState : public StateBase, | |
| 367 public MessagePumpMojoHandler { | |
| 368 public: | |
| 369 SameThreadWatchingState(HandleWatcher* watcher, | |
| 370 const Handle& handle, | |
| 371 MojoHandleSignals handle_signals, | |
| 372 MojoDeadline deadline, | |
| 373 const base::Callback<void(MojoResult)>& callback) | |
| 374 : StateBase(watcher, callback), | |
| 375 handle_(handle) { | |
| 376 DCHECK(MessagePumpMojo::IsCurrent()); | |
| 377 | |
| 378 MessagePumpMojo::current()->AddHandler( | |
| 379 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); | |
| 380 } | |
| 381 | |
| 382 ~SameThreadWatchingState() override { | |
| 383 if (!got_ready()) | |
| 384 MessagePumpMojo::current()->RemoveHandler(handle_); | |
| 385 } | |
| 386 | |
| 387 private: | |
| 388 // MessagePumpMojoHandler overrides: | |
| 389 void OnHandleReady(const Handle& handle) override { | |
| 390 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK); | |
| 391 } | |
| 392 | |
| 393 void OnHandleError(const Handle& handle, MojoResult result) override { | |
| 394 StopWatchingAndNotifyReady(handle, result); | |
| 395 } | |
| 396 | |
| 397 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) { | |
| 398 DCHECK_EQ(handle.value(), handle_.value()); | |
| 399 MessagePumpMojo::current()->RemoveHandler(handle_); | |
| 400 NotifyHandleReady(result); | |
| 401 } | |
| 402 | |
| 403 Handle handle_; | |
| 404 | |
| 405 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState); | |
| 406 }; | |
| 407 | |
| 408 // If the thread on which HandleWatcher is used runs a message pump different | |
| 409 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the | |
| 410 // handle on the handle watcher thread. | |
| 411 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { | |
| 412 public: | |
| 413 SecondaryThreadWatchingState(HandleWatcher* watcher, | |
| 414 const Handle& handle, | |
| 415 MojoHandleSignals handle_signals, | |
| 416 MojoDeadline deadline, | |
| 417 const base::Callback<void(MojoResult)>& callback) | |
| 418 : StateBase(watcher, callback), | |
| 419 weak_factory_(this) { | |
| 420 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( | |
| 421 handle, | |
| 422 handle_signals, | |
| 423 MojoDeadlineToTimeTicks(deadline), | |
| 424 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, | |
| 425 weak_factory_.GetWeakPtr())); | |
| 426 } | |
| 427 | |
| 428 ~SecondaryThreadWatchingState() override { | |
| 429 // If we've been notified the handle is ready (|got_ready()| is true) then | |
| 430 // the watch has been implicitly removed by | |
| 431 // WatcherThreadManager/MessagePumpMojo and we don't have to call | |
| 432 // StopWatching(). To do so would needlessly entail posting a task and | |
| 433 // blocking until the background thread services it. | |
| 434 if (!got_ready()) | |
| 435 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); | |
| 436 } | |
| 437 | |
| 438 private: | |
| 439 WatcherID watcher_id_; | |
| 440 | |
| 441 // Used to weakly bind |this| to the WatcherThreadManager. | |
| 442 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; | |
| 443 | |
| 444 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); | |
| 445 }; | |
| 446 | |
| 447 // HandleWatcher --------------------------------------------------------------- | |
| 448 | |
| 449 HandleWatcher::HandleWatcher() { | |
| 450 } | |
| 451 | |
| 452 HandleWatcher::~HandleWatcher() { | |
| 453 } | |
| 454 | |
| 455 void HandleWatcher::Start(const Handle& handle, | |
| 456 MojoHandleSignals handle_signals, | |
| 457 MojoDeadline deadline, | |
| 458 const base::Callback<void(MojoResult)>& callback) { | |
| 459 DCHECK(handle.is_valid()); | |
| 460 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); | |
| 461 | |
| 462 // Need to clear the state before creating a new one. | |
| 463 state_.reset(); | |
| 464 if (MessagePumpMojo::IsCurrent()) { | |
| 465 state_.reset(new SameThreadWatchingState( | |
| 466 this, handle, handle_signals, deadline, callback)); | |
| 467 } else { | |
| 468 state_.reset(new SecondaryThreadWatchingState( | |
| 469 this, handle, handle_signals, deadline, callback)); | |
| 470 } | |
| 471 } | |
| 472 | |
| 473 void HandleWatcher::Stop() { | |
| 474 state_.reset(); | |
| 475 } | |
| 476 | |
| 477 } // namespace common | |
| 478 } // namespace mojo | |
| OLD | NEW |