Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/common/handle_watcher.h" | 5 #include "mojo/common/handle_watcher.h" |
| 6 | 6 |
| 7 #include <map> | 7 #include <map> |
| 8 | 8 |
| 9 #include "base/atomic_sequence_num.h" | 9 #include "base/atomic_sequence_num.h" |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| 11 #include "base/lazy_instance.h" | 11 #include "base/lazy_instance.h" |
| 12 #include "base/logging.h" | |
| 13 #include "base/macros.h" | |
| 12 #include "base/memory/singleton.h" | 14 #include "base/memory/singleton.h" |
| 13 #include "base/memory/weak_ptr.h" | 15 #include "base/memory/weak_ptr.h" |
| 14 #include "base/message_loop/message_loop.h" | 16 #include "base/message_loop/message_loop.h" |
| 15 #include "base/message_loop/message_loop_proxy.h" | 17 #include "base/message_loop/message_loop_proxy.h" |
| 16 #include "base/synchronization/lock.h" | 18 #include "base/synchronization/lock.h" |
| 17 #include "base/synchronization/waitable_event.h" | 19 #include "base/synchronization/waitable_event.h" |
| 18 #include "base/threading/thread.h" | 20 #include "base/threading/thread.h" |
| 19 #include "base/threading/thread_restrictions.h" | 21 #include "base/threading/thread_restrictions.h" |
| 20 #include "base/time/time.h" | 22 #include "base/time/time.h" |
| 21 #include "mojo/common/message_pump_mojo.h" | 23 #include "mojo/common/message_pump_mojo.h" |
| 22 #include "mojo/common/message_pump_mojo_handler.h" | 24 #include "mojo/common/message_pump_mojo_handler.h" |
| 23 #include "mojo/common/time_helper.h" | 25 #include "mojo/common/time_helper.h" |
| 24 | 26 |
| 25 namespace mojo { | 27 namespace mojo { |
| 26 namespace common { | 28 namespace common { |
| 27 | 29 |
| 28 typedef int WatcherID; | 30 typedef int WatcherID; |
| 29 | 31 |
| 30 namespace { | 32 namespace { |
| 31 | 33 |
| 32 const char kWatcherThreadName[] = "handle-watcher-thread"; | 34 const char kWatcherThreadName[] = "handle-watcher-thread"; |
| 33 | 35 |
| 34 // TODO(sky): this should be unnecessary once MessageLoop has been refactored. | |
| 35 MessagePumpMojo* message_pump_mojo = NULL; | |
| 36 | |
| 37 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { | |
| 38 message_pump_mojo = new MessagePumpMojo; | |
| 39 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass(); | |
| 40 } | |
| 41 | |
| 42 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { | 36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { |
| 43 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : | 37 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : |
| 44 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); | 38 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); |
| 45 } | 39 } |
| 46 | 40 |
| 47 // Tracks the data for a single call to Start(). | 41 // Tracks the data for a single call to Start(). |
| 48 struct WatchData { | 42 struct WatchData { |
| 49 WatchData() | 43 WatchData() |
| 50 : id(0), | 44 : id(0), |
| 51 handle_signals(MOJO_HANDLE_SIGNAL_NONE), | 45 handle_signals(MOJO_HANDLE_SIGNAL_NONE), |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 99 | 93 |
| 100 WatcherBackend::~WatcherBackend() { | 94 WatcherBackend::~WatcherBackend() { |
| 101 } | 95 } |
| 102 | 96 |
| 103 void WatcherBackend::StartWatching(const WatchData& data) { | 97 void WatcherBackend::StartWatching(const WatchData& data) { |
| 104 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); | 98 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); |
| 105 | 99 |
| 106 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); | 100 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); |
| 107 | 101 |
| 108 handle_to_data_[data.handle] = data; | 102 handle_to_data_[data.handle] = data; |
| 109 message_pump_mojo->AddHandler(this, data.handle, | 103 MessagePumpMojo::current()->AddHandler(this, data.handle, |
| 110 data.handle_signals, | 104 data.handle_signals, |
| 111 data.deadline); | 105 data.deadline); |
| 112 } | 106 } |
| 113 | 107 |
| 114 void WatcherBackend::StopWatching(WatcherID watcher_id) { | 108 void WatcherBackend::StopWatching(WatcherID watcher_id) { |
| 115 // Because of the thread hop it is entirely possible to get here and not | 109 // Because of the thread hop it is entirely possible to get here and not |
| 116 // have a valid handle registered for |watcher_id|. | 110 // have a valid handle registered for |watcher_id|. |
| 117 Handle handle; | 111 Handle handle; |
| 118 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { | 112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { |
| 119 handle_to_data_.erase(handle); | 113 handle_to_data_.erase(handle); |
| 120 message_pump_mojo->RemoveHandler(handle); | 114 MessagePumpMojo::current()->RemoveHandler(handle); |
| 121 } | 115 } |
| 122 } | 116 } |
| 123 | 117 |
| 124 void WatcherBackend::RemoveAndNotify(const Handle& handle, | 118 void WatcherBackend::RemoveAndNotify(const Handle& handle, |
| 125 MojoResult result) { | 119 MojoResult result) { |
| 126 if (handle_to_data_.count(handle) == 0) | 120 if (handle_to_data_.count(handle) == 0) |
| 127 return; | 121 return; |
| 128 | 122 |
| 129 const WatchData data(handle_to_data_[handle]); | 123 const WatchData data(handle_to_data_[handle]); |
| 130 handle_to_data_.erase(handle); | 124 handle_to_data_.erase(handle); |
| 131 message_pump_mojo->RemoveHandler(handle); | 125 MessagePumpMojo::current()->RemoveHandler(handle); |
| 132 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); | 126 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); |
| 133 } | 127 } |
| 134 | 128 |
| 135 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, | 129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, |
| 136 Handle* handle) const { | 130 Handle* handle) const { |
| 137 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); | 131 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); |
| 138 i != handle_to_data_.end(); ++i) { | 132 i != handle_to_data_.end(); ++i) { |
| 139 if (i->second.id == watcher_id) { | 133 if (i->second.id == watcher_id) { |
| 140 *handle = i->second.handle; | 134 *handle = i->second.handle; |
| 141 return true; | 135 return true; |
| (...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 305 } else { | 299 } else { |
| 306 backend_.StopWatching(requests[i].stop_id); | 300 backend_.StopWatching(requests[i].stop_id); |
| 307 requests[i].stop_event->Signal(); | 301 requests[i].stop_event->Signal(); |
| 308 } | 302 } |
| 309 } | 303 } |
| 310 } | 304 } |
| 311 | 305 |
| 312 WatcherThreadManager::WatcherThreadManager() | 306 WatcherThreadManager::WatcherThreadManager() |
| 313 : thread_(kWatcherThreadName) { | 307 : thread_(kWatcherThreadName) { |
| 314 base::Thread::Options thread_options; | 308 base::Thread::Options thread_options; |
| 315 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); | 309 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); |
| 316 thread_.StartWithOptions(thread_options); | 310 thread_.StartWithOptions(thread_options); |
| 317 } | 311 } |
| 318 | 312 |
| 319 // HandleWatcher::State -------------------------------------------------------- | 313 // HandleWatcher::StateBase and subclasses ------------------------------------- |
| 320 | 314 |
| 321 // Represents the state of the HandleWatcher. Owns the user's callback and | 315 // The base class of HandleWatcher's state. Owns the user's callback and |
| 322 // monitors the current thread's MessageLoop to know when to force the callback | 316 // monitors the current thread's MessageLoop to know when to force the callback |
| 323 // to run (with an error) even though the pipe hasn't been signaled yet. | 317 // to run (with an error) even though the pipe hasn't been signaled yet. |
| 324 class HandleWatcher::State : public base::MessageLoop::DestructionObserver { | 318 // |
| 319 // TODO(yzshen): Very soon MessagePumpMojo will be changed to notify all handles | |
|
yzshen1
2014/08/28 19:26:05
(Actually we don't have such a plan, so removed th
| |
| 320 // on its destruction. When that is landed, we shouldn't need the | |
| 321 // DestructionObserver for SameThreadWatchingState. | |
| 322 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { | |
| 325 public: | 323 public: |
| 326 State(HandleWatcher* watcher, | 324 StateBase(HandleWatcher* watcher, |
| 327 const Handle& handle, | 325 const base::Callback<void(MojoResult)>& callback) |
| 328 MojoHandleSignals handle_signals, | |
| 329 MojoDeadline deadline, | |
| 330 const base::Callback<void(MojoResult)>& callback) | |
| 331 : watcher_(watcher), | 326 : watcher_(watcher), |
| 332 callback_(callback), | 327 callback_(callback), |
| 333 got_ready_(false), | 328 got_ready_(false) { |
| 334 weak_factory_(this) { | |
| 335 base::MessageLoop::current()->AddDestructionObserver(this); | 329 base::MessageLoop::current()->AddDestructionObserver(this); |
| 336 | |
| 337 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( | |
| 338 handle, | |
| 339 handle_signals, | |
| 340 MojoDeadlineToTimeTicks(deadline), | |
| 341 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr())); | |
| 342 } | 330 } |
| 343 | 331 |
| 344 virtual ~State() { | 332 virtual ~StateBase() { |
| 345 base::MessageLoop::current()->RemoveDestructionObserver(this); | 333 base::MessageLoop::current()->RemoveDestructionObserver(this); |
| 334 } | |
| 346 | 335 |
| 347 // If we've been notified the handle is ready (|got_ready_| is true) then | 336 void NotifyHandleReady(MojoResult result) { |
|
sky
2014/08/27 22:49:45
nit: make this (and got_ready()) protected.
yzshen1
2014/08/28 19:26:05
Done.
| |
| 348 // the watch has been implicitly removed by | 337 got_ready_ = true; |
| 349 // WatcherThreadManager/MessagePumpMojo and we don't have to call | 338 NotifyAndDestroy(result); |
| 350 // StopWatching(). To do so would needlessly entail posting a task and | |
| 351 // blocking until the background thread services it. | |
| 352 if (!got_ready_) | |
| 353 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); | |
| 354 } | 339 } |
| 355 | 340 |
| 341 bool got_ready() const { return got_ready_; } | |
| 342 | |
| 356 private: | 343 private: |
| 357 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { | 344 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { |
| 358 // The current thread is exiting. Simulate a watch error. | 345 // The current thread is exiting. Simulate a watch error. |
| 359 NotifyAndDestroy(MOJO_RESULT_ABORTED); | 346 NotifyAndDestroy(MOJO_RESULT_ABORTED); |
| 360 } | 347 } |
| 361 | 348 |
| 362 void OnHandleReady(MojoResult result) { | |
| 363 got_ready_ = true; | |
| 364 NotifyAndDestroy(result); | |
| 365 } | |
| 366 | |
| 367 void NotifyAndDestroy(MojoResult result) { | 349 void NotifyAndDestroy(MojoResult result) { |
| 368 base::Callback<void(MojoResult)> callback = callback_; | 350 base::Callback<void(MojoResult)> callback = callback_; |
| 369 watcher_->Stop(); // Destroys |this|. | 351 watcher_->Stop(); // Destroys |this|. |
| 370 | 352 |
| 371 callback.Run(result); | 353 callback.Run(result); |
| 372 } | 354 } |
| 373 | 355 |
| 374 HandleWatcher* watcher_; | 356 HandleWatcher* watcher_; |
| 375 WatcherID watcher_id_; | |
| 376 base::Callback<void(MojoResult)> callback_; | 357 base::Callback<void(MojoResult)> callback_; |
| 377 | 358 |
| 378 // Have we been notified that the handle is ready? | 359 // Have we been notified that the handle is ready? |
| 379 bool got_ready_; | 360 bool got_ready_; |
| 380 | 361 |
| 362 DISALLOW_COPY_AND_ASSIGN(StateBase); | |
| 363 }; | |
| 364 | |
| 365 // If the thread on which HandleWatcher is used runs MessagePumpMojo, | |
| 366 // SameThreadWatchingState is used to directly watch the handle on the same | |
| 367 // thread. | |
| 368 class HandleWatcher::SameThreadWatchingState : public StateBase, | |
| 369 public MessagePumpMojoHandler { | |
| 370 public: | |
| 371 SameThreadWatchingState(HandleWatcher* watcher, | |
| 372 const Handle& handle, | |
| 373 MojoHandleSignals handle_signals, | |
| 374 MojoDeadline deadline, | |
| 375 const base::Callback<void(MojoResult)>& callback) | |
| 376 : StateBase(watcher, callback), | |
| 377 handle_(handle) { | |
| 378 DCHECK(MessagePumpMojo::IsCurrent()); | |
| 379 | |
| 380 MessagePumpMojo::current()->AddHandler( | |
| 381 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); | |
| 382 } | |
| 383 | |
| 384 virtual ~SameThreadWatchingState() { | |
| 385 if (!got_ready()) | |
| 386 MessagePumpMojo::current()->RemoveHandler(handle_); | |
| 387 } | |
| 388 | |
| 389 private: | |
| 390 // MessagePumpMojoHandler overrides: | |
| 391 virtual void OnHandleReady(const Handle& handle) OVERRIDE { | |
| 392 DCHECK_EQ(handle.value(), handle_.value()); | |
|
sky
2014/08/27 22:49:45
nit: refactor this and OnHandleError into a common
yzshen1
2014/08/28 19:26:05
Done.
| |
| 393 MessagePumpMojo::current()->RemoveHandler(handle_); | |
| 394 NotifyHandleReady(MOJO_RESULT_OK); | |
| 395 } | |
| 396 | |
| 397 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE { | |
| 398 DCHECK_EQ(handle.value(), handle_.value()); | |
| 399 MessagePumpMojo::current()->RemoveHandler(handle_); | |
| 400 NotifyHandleReady(result); | |
| 401 } | |
| 402 | |
| 403 Handle handle_; | |
| 404 | |
| 405 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState); | |
| 406 }; | |
| 407 | |
| 408 // If the thread on which HandleWatcher is used runs a message pump different | |
| 409 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the | |
| 410 // handle on the handle watcher thread. | |
| 411 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { | |
| 412 public: | |
| 413 SecondaryThreadWatchingState(HandleWatcher* watcher, | |
| 414 const Handle& handle, | |
| 415 MojoHandleSignals handle_signals, | |
| 416 MojoDeadline deadline, | |
| 417 const base::Callback<void(MojoResult)>& callback) | |
| 418 : StateBase(watcher, callback), | |
| 419 weak_factory_(this) { | |
| 420 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( | |
| 421 handle, | |
| 422 handle_signals, | |
| 423 MojoDeadlineToTimeTicks(deadline), | |
| 424 base::Bind(&StateBase::NotifyHandleReady, weak_factory_.GetWeakPtr())); | |
| 425 } | |
| 426 | |
| 427 virtual ~SecondaryThreadWatchingState() { | |
| 428 // If we've been notified the handle is ready (|got_ready()| is true) then | |
| 429 // the watch has been implicitly removed by | |
| 430 // WatcherThreadManager/MessagePumpMojo and we don't have to call | |
| 431 // StopWatching(). To do so would needlessly entail posting a task and | |
| 432 // blocking until the background thread services it. | |
| 433 if (!got_ready()) | |
| 434 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); | |
| 435 } | |
| 436 | |
| 437 private: | |
| 438 WatcherID watcher_id_; | |
| 439 | |
| 381 // Used to weakly bind |this| to the WatcherThreadManager. | 440 // Used to weakly bind |this| to the WatcherThreadManager. |
| 382 base::WeakPtrFactory<State> weak_factory_; | 441 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; |
| 442 | |
| 443 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); | |
| 383 }; | 444 }; |
| 384 | 445 |
| 385 // HandleWatcher --------------------------------------------------------------- | 446 // HandleWatcher --------------------------------------------------------------- |
| 386 | 447 |
| 387 HandleWatcher::HandleWatcher() { | 448 HandleWatcher::HandleWatcher() { |
| 388 } | 449 } |
| 389 | 450 |
| 390 HandleWatcher::~HandleWatcher() { | 451 HandleWatcher::~HandleWatcher() { |
| 391 } | 452 } |
| 392 | 453 |
| 393 void HandleWatcher::Start(const Handle& handle, | 454 void HandleWatcher::Start(const Handle& handle, |
| 394 MojoHandleSignals handle_signals, | 455 MojoHandleSignals handle_signals, |
| 395 MojoDeadline deadline, | 456 MojoDeadline deadline, |
| 396 const base::Callback<void(MojoResult)>& callback) { | 457 const base::Callback<void(MojoResult)>& callback) { |
| 397 DCHECK(handle.is_valid()); | 458 DCHECK(handle.is_valid()); |
| 398 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); | 459 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); |
| 399 | 460 |
| 400 state_.reset(new State(this, handle, handle_signals, deadline, callback)); | 461 if (MessagePumpMojo::IsCurrent()) { |
| 462 state_.reset(new SameThreadWatchingState( | |
| 463 this, handle, handle_signals, deadline, callback)); | |
| 464 } else { | |
| 465 state_.reset(new SecondaryThreadWatchingState( | |
| 466 this, handle, handle_signals, deadline, callback)); | |
| 467 } | |
| 401 } | 468 } |
| 402 | 469 |
| 403 void HandleWatcher::Stop() { | 470 void HandleWatcher::Stop() { |
| 404 state_.reset(); | 471 state_.reset(); |
| 405 } | 472 } |
| 406 | 473 |
| 407 } // namespace common | 474 } // namespace common |
| 408 } // namespace mojo | 475 } // namespace mojo |
| OLD | NEW |