OLD | NEW |
(Empty) | |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/common/handle_watcher.h" |
| 6 |
| 7 #include <map> |
| 8 |
| 9 #include "base/atomic_sequence_num.h" |
| 10 #include "base/bind.h" |
| 11 #include "base/lazy_instance.h" |
| 12 #include "base/logging.h" |
| 13 #include "base/macros.h" |
| 14 #include "base/memory/singleton.h" |
| 15 #include "base/memory/weak_ptr.h" |
| 16 #include "base/message_loop/message_loop.h" |
| 17 #include "base/single_thread_task_runner.h" |
| 18 #include "base/synchronization/lock.h" |
| 19 #include "base/synchronization/waitable_event.h" |
| 20 #include "base/thread_task_runner_handle.h" |
| 21 #include "base/threading/thread.h" |
| 22 #include "base/threading/thread_restrictions.h" |
| 23 #include "base/time/time.h" |
| 24 #include "mojo/common/message_pump_mojo.h" |
| 25 #include "mojo/common/message_pump_mojo_handler.h" |
| 26 #include "mojo/common/time_helper.h" |
| 27 |
| 28 namespace mojo { |
| 29 namespace common { |
| 30 |
| 31 typedef int WatcherID; |
| 32 |
| 33 namespace { |
| 34 |
| 35 const char kWatcherThreadName[] = "handle-watcher-thread"; |
| 36 |
| 37 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { |
| 38 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : |
| 39 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); |
| 40 } |
| 41 |
| 42 // Tracks the data for a single call to Start(). |
| 43 struct WatchData { |
| 44 WatchData() |
| 45 : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {} |
| 46 |
| 47 WatcherID id; |
| 48 Handle handle; |
| 49 MojoHandleSignals handle_signals; |
| 50 base::TimeTicks deadline; |
| 51 base::Callback<void(MojoResult)> callback; |
| 52 scoped_refptr<base::SingleThreadTaskRunner> task_runner; |
| 53 }; |
| 54 |
| 55 // WatcherBackend -------------------------------------------------------------- |
| 56 |
| 57 // WatcherBackend is responsible for managing the requests and interacting with |
| 58 // MessagePumpMojo. All access (outside of creation/destruction) is done on the |
| 59 // thread WatcherThreadManager creates. |
| 60 class WatcherBackend : public MessagePumpMojoHandler { |
| 61 public: |
| 62 WatcherBackend(); |
| 63 ~WatcherBackend() override; |
| 64 |
| 65 void StartWatching(const WatchData& data); |
| 66 |
| 67 // Cancels a previously scheduled request to start a watch. |
| 68 void StopWatching(WatcherID watcher_id); |
| 69 |
| 70 private: |
| 71 typedef std::map<Handle, WatchData> HandleToWatchDataMap; |
| 72 |
| 73 // Invoked when a handle needs to be removed and notified. |
| 74 void RemoveAndNotify(const Handle& handle, MojoResult result); |
| 75 |
| 76 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found |
| 77 // and sets |handle| to the Handle. Returns false if not a known id. |
| 78 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; |
| 79 |
| 80 // MessagePumpMojoHandler overrides: |
| 81 void OnHandleReady(const Handle& handle) override; |
| 82 void OnHandleError(const Handle& handle, MojoResult result) override; |
| 83 |
| 84 // Maps from assigned id to WatchData. |
| 85 HandleToWatchDataMap handle_to_data_; |
| 86 |
| 87 DISALLOW_COPY_AND_ASSIGN(WatcherBackend); |
| 88 }; |
| 89 |
| 90 WatcherBackend::WatcherBackend() { |
| 91 } |
| 92 |
| 93 WatcherBackend::~WatcherBackend() { |
| 94 } |
| 95 |
| 96 void WatcherBackend::StartWatching(const WatchData& data) { |
| 97 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); |
| 98 |
| 99 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); |
| 100 |
| 101 handle_to_data_[data.handle] = data; |
| 102 MessagePumpMojo::current()->AddHandler(this, data.handle, |
| 103 data.handle_signals, |
| 104 data.deadline); |
| 105 } |
| 106 |
| 107 void WatcherBackend::StopWatching(WatcherID watcher_id) { |
| 108 // Because of the thread hop it is entirely possible to get here and not |
| 109 // have a valid handle registered for |watcher_id|. |
| 110 Handle handle; |
| 111 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { |
| 112 handle_to_data_.erase(handle); |
| 113 MessagePumpMojo::current()->RemoveHandler(handle); |
| 114 } |
| 115 } |
| 116 |
| 117 void WatcherBackend::RemoveAndNotify(const Handle& handle, |
| 118 MojoResult result) { |
| 119 if (handle_to_data_.count(handle) == 0) |
| 120 return; |
| 121 |
| 122 const WatchData data(handle_to_data_[handle]); |
| 123 handle_to_data_.erase(handle); |
| 124 MessagePumpMojo::current()->RemoveHandler(handle); |
| 125 |
| 126 data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result)); |
| 127 } |
| 128 |
| 129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, |
| 130 Handle* handle) const { |
| 131 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); |
| 132 i != handle_to_data_.end(); ++i) { |
| 133 if (i->second.id == watcher_id) { |
| 134 *handle = i->second.handle; |
| 135 return true; |
| 136 } |
| 137 } |
| 138 return false; |
| 139 } |
| 140 |
| 141 void WatcherBackend::OnHandleReady(const Handle& handle) { |
| 142 RemoveAndNotify(handle, MOJO_RESULT_OK); |
| 143 } |
| 144 |
| 145 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { |
| 146 RemoveAndNotify(handle, result); |
| 147 } |
| 148 |
| 149 // WatcherThreadManager -------------------------------------------------------- |
| 150 |
| 151 // WatcherThreadManager manages the background thread that listens for handles |
| 152 // to be ready. All requests are handled by WatcherBackend. |
| 153 } // namespace |
| 154 |
| 155 class WatcherThreadManager { |
| 156 public: |
| 157 ~WatcherThreadManager(); |
| 158 |
| 159 // Returns the shared instance. |
| 160 static WatcherThreadManager* GetInstance(); |
| 161 |
| 162 // Starts watching the requested handle. Returns a unique ID that is used to |
| 163 // stop watching the handle. When the handle is ready |callback| is notified |
| 164 // on the thread StartWatching() was invoked on. |
| 165 // This may be invoked on any thread. |
| 166 WatcherID StartWatching(const Handle& handle, |
| 167 MojoHandleSignals handle_signals, |
| 168 base::TimeTicks deadline, |
| 169 const base::Callback<void(MojoResult)>& callback); |
| 170 |
| 171 // Stops watching a handle. |
| 172 // This may be invoked on any thread. |
| 173 void StopWatching(WatcherID watcher_id); |
| 174 |
| 175 private: |
| 176 enum RequestType { |
| 177 REQUEST_START, |
| 178 REQUEST_STOP, |
| 179 }; |
| 180 |
| 181 // See description of |requests_| for details. |
| 182 struct RequestData { |
| 183 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} |
| 184 |
| 185 RequestType type; |
| 186 WatchData start_data; |
| 187 WatcherID stop_id; |
| 188 base::WaitableEvent* stop_event; |
| 189 }; |
| 190 |
| 191 typedef std::vector<RequestData> Requests; |
| 192 |
| 193 friend struct DefaultSingletonTraits<WatcherThreadManager>; |
| 194 |
| 195 WatcherThreadManager(); |
| 196 |
| 197 // Schedules a request on the background thread. See |requests_| for details. |
| 198 void AddRequest(const RequestData& data); |
| 199 |
| 200 // Processes requests added to |requests_|. This is invoked on the backend |
| 201 // thread. |
| 202 void ProcessRequestsOnBackendThread(); |
| 203 |
| 204 base::Thread thread_; |
| 205 |
| 206 base::AtomicSequenceNumber watcher_id_generator_; |
| 207 |
| 208 WatcherBackend backend_; |
| 209 |
| 210 // Protects |requests_|. |
| 211 base::Lock lock_; |
| 212 |
| 213 // Start/Stop result in adding a RequestData to |requests_| (protected by |
| 214 // |lock_|). When the background thread wakes up it processes the requests. |
| 215 Requests requests_; |
| 216 |
| 217 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); |
| 218 }; |
| 219 |
| 220 WatcherThreadManager::~WatcherThreadManager() { |
| 221 thread_.Stop(); |
| 222 } |
| 223 |
| 224 WatcherThreadManager* WatcherThreadManager::GetInstance() { |
| 225 return Singleton<WatcherThreadManager>::get(); |
| 226 } |
| 227 |
| 228 WatcherID WatcherThreadManager::StartWatching( |
| 229 const Handle& handle, |
| 230 MojoHandleSignals handle_signals, |
| 231 base::TimeTicks deadline, |
| 232 const base::Callback<void(MojoResult)>& callback) { |
| 233 RequestData request_data; |
| 234 request_data.type = REQUEST_START; |
| 235 request_data.start_data.id = watcher_id_generator_.GetNext(); |
| 236 request_data.start_data.handle = handle; |
| 237 request_data.start_data.callback = callback; |
| 238 request_data.start_data.handle_signals = handle_signals; |
| 239 request_data.start_data.deadline = deadline; |
| 240 request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get(); |
| 241 AddRequest(request_data); |
| 242 return request_data.start_data.id; |
| 243 } |
| 244 |
| 245 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { |
| 246 // Handle the case of StartWatching() followed by StopWatching() before |
| 247 // |thread_| woke up. |
| 248 { |
| 249 base::AutoLock auto_lock(lock_); |
| 250 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { |
| 251 if (i->type == REQUEST_START && i->start_data.id == watcher_id) { |
| 252 // Watcher ids are not reused, so if we find it we can stop. |
| 253 requests_.erase(i); |
| 254 return; |
| 255 } |
| 256 } |
| 257 } |
| 258 |
| 259 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| 260 base::WaitableEvent event(true, false); |
| 261 RequestData request_data; |
| 262 request_data.type = REQUEST_STOP; |
| 263 request_data.stop_id = watcher_id; |
| 264 request_data.stop_event = &event; |
| 265 AddRequest(request_data); |
| 266 |
| 267 // We need to block until the handle is actually removed. |
| 268 event.Wait(); |
| 269 } |
| 270 |
| 271 void WatcherThreadManager::AddRequest(const RequestData& data) { |
| 272 { |
| 273 base::AutoLock auto_lock(lock_); |
| 274 const bool was_empty = requests_.empty(); |
| 275 requests_.push_back(data); |
| 276 if (!was_empty) |
| 277 return; |
| 278 } |
| 279 // We own |thread_|, so it's safe to use Unretained() here. |
| 280 thread_.task_runner()->PostTask( |
| 281 FROM_HERE, |
| 282 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, |
| 283 base::Unretained(this))); |
| 284 } |
| 285 |
| 286 void WatcherThreadManager::ProcessRequestsOnBackendThread() { |
| 287 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); |
| 288 |
| 289 Requests requests; |
| 290 { |
| 291 base::AutoLock auto_lock(lock_); |
| 292 requests_.swap(requests); |
| 293 } |
| 294 for (size_t i = 0; i < requests.size(); ++i) { |
| 295 if (requests[i].type == REQUEST_START) { |
| 296 backend_.StartWatching(requests[i].start_data); |
| 297 } else { |
| 298 backend_.StopWatching(requests[i].stop_id); |
| 299 requests[i].stop_event->Signal(); |
| 300 } |
| 301 } |
| 302 } |
| 303 |
| 304 WatcherThreadManager::WatcherThreadManager() |
| 305 : thread_(kWatcherThreadName) { |
| 306 base::Thread::Options thread_options; |
| 307 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); |
| 308 thread_.StartWithOptions(thread_options); |
| 309 } |
| 310 |
| 311 // HandleWatcher::StateBase and subclasses ------------------------------------- |
| 312 |
| 313 // The base class of HandleWatcher's state. Owns the user's callback and |
| 314 // monitors the current thread's MessageLoop to know when to force the callback |
| 315 // to run (with an error) even though the pipe hasn't been signaled yet. |
| 316 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { |
| 317 public: |
| 318 StateBase(HandleWatcher* watcher, |
| 319 const base::Callback<void(MojoResult)>& callback) |
| 320 : watcher_(watcher), |
| 321 callback_(callback), |
| 322 got_ready_(false) { |
| 323 base::MessageLoop::current()->AddDestructionObserver(this); |
| 324 } |
| 325 |
| 326 ~StateBase() override { |
| 327 base::MessageLoop::current()->RemoveDestructionObserver(this); |
| 328 } |
| 329 |
| 330 protected: |
| 331 void NotifyHandleReady(MojoResult result) { |
| 332 got_ready_ = true; |
| 333 NotifyAndDestroy(result); |
| 334 } |
| 335 |
| 336 bool got_ready() const { return got_ready_; } |
| 337 |
| 338 private: |
| 339 void WillDestroyCurrentMessageLoop() override { |
| 340 // The current thread is exiting. Simulate a watch error. |
| 341 NotifyAndDestroy(MOJO_RESULT_ABORTED); |
| 342 } |
| 343 |
| 344 void NotifyAndDestroy(MojoResult result) { |
| 345 base::Callback<void(MojoResult)> callback = callback_; |
| 346 watcher_->Stop(); // Destroys |this|. |
| 347 |
| 348 callback.Run(result); |
| 349 } |
| 350 |
| 351 HandleWatcher* watcher_; |
| 352 base::Callback<void(MojoResult)> callback_; |
| 353 |
| 354 // Have we been notified that the handle is ready? |
| 355 bool got_ready_; |
| 356 |
| 357 DISALLOW_COPY_AND_ASSIGN(StateBase); |
| 358 }; |
| 359 |
| 360 // If the thread on which HandleWatcher is used runs MessagePumpMojo, |
| 361 // SameThreadWatchingState is used to directly watch the handle on the same |
| 362 // thread. |
| 363 class HandleWatcher::SameThreadWatchingState : public StateBase, |
| 364 public MessagePumpMojoHandler { |
| 365 public: |
| 366 SameThreadWatchingState(HandleWatcher* watcher, |
| 367 const Handle& handle, |
| 368 MojoHandleSignals handle_signals, |
| 369 MojoDeadline deadline, |
| 370 const base::Callback<void(MojoResult)>& callback) |
| 371 : StateBase(watcher, callback), |
| 372 handle_(handle) { |
| 373 DCHECK(MessagePumpMojo::IsCurrent()); |
| 374 |
| 375 MessagePumpMojo::current()->AddHandler( |
| 376 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); |
| 377 } |
| 378 |
| 379 ~SameThreadWatchingState() override { |
| 380 if (!got_ready()) |
| 381 MessagePumpMojo::current()->RemoveHandler(handle_); |
| 382 } |
| 383 |
| 384 private: |
| 385 // MessagePumpMojoHandler overrides: |
| 386 void OnHandleReady(const Handle& handle) override { |
| 387 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK); |
| 388 } |
| 389 |
| 390 void OnHandleError(const Handle& handle, MojoResult result) override { |
| 391 StopWatchingAndNotifyReady(handle, result); |
| 392 } |
| 393 |
| 394 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) { |
| 395 DCHECK_EQ(handle.value(), handle_.value()); |
| 396 MessagePumpMojo::current()->RemoveHandler(handle_); |
| 397 NotifyHandleReady(result); |
| 398 } |
| 399 |
| 400 Handle handle_; |
| 401 |
| 402 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState); |
| 403 }; |
| 404 |
| 405 // If the thread on which HandleWatcher is used runs a message pump different |
| 406 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the |
| 407 // handle on the handle watcher thread. |
| 408 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { |
| 409 public: |
| 410 SecondaryThreadWatchingState(HandleWatcher* watcher, |
| 411 const Handle& handle, |
| 412 MojoHandleSignals handle_signals, |
| 413 MojoDeadline deadline, |
| 414 const base::Callback<void(MojoResult)>& callback) |
| 415 : StateBase(watcher, callback), |
| 416 weak_factory_(this) { |
| 417 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( |
| 418 handle, |
| 419 handle_signals, |
| 420 MojoDeadlineToTimeTicks(deadline), |
| 421 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, |
| 422 weak_factory_.GetWeakPtr())); |
| 423 } |
| 424 |
| 425 ~SecondaryThreadWatchingState() override { |
| 426 // If we've been notified the handle is ready (|got_ready()| is true) then |
| 427 // the watch has been implicitly removed by |
| 428 // WatcherThreadManager/MessagePumpMojo and we don't have to call |
| 429 // StopWatching(). To do so would needlessly entail posting a task and |
| 430 // blocking until the background thread services it. |
| 431 if (!got_ready()) |
| 432 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); |
| 433 } |
| 434 |
| 435 private: |
| 436 WatcherID watcher_id_; |
| 437 |
| 438 // Used to weakly bind |this| to the WatcherThreadManager. |
| 439 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; |
| 440 |
| 441 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); |
| 442 }; |
| 443 |
| 444 // HandleWatcher --------------------------------------------------------------- |
| 445 |
| 446 HandleWatcher::HandleWatcher() { |
| 447 } |
| 448 |
| 449 HandleWatcher::~HandleWatcher() { |
| 450 } |
| 451 |
| 452 void HandleWatcher::Start(const Handle& handle, |
| 453 MojoHandleSignals handle_signals, |
| 454 MojoDeadline deadline, |
| 455 const base::Callback<void(MojoResult)>& callback) { |
| 456 DCHECK(handle.is_valid()); |
| 457 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); |
| 458 |
| 459 // Need to clear the state before creating a new one. |
| 460 state_.reset(); |
| 461 if (MessagePumpMojo::IsCurrent()) { |
| 462 state_.reset(new SameThreadWatchingState( |
| 463 this, handle, handle_signals, deadline, callback)); |
| 464 } else { |
| 465 state_.reset(new SecondaryThreadWatchingState( |
| 466 this, handle, handle_signals, deadline, callback)); |
| 467 } |
| 468 } |
| 469 |
| 470 void HandleWatcher::Stop() { |
| 471 state_.reset(); |
| 472 } |
| 473 |
| 474 } // namespace common |
| 475 } // namespace mojo |
OLD | NEW |