OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/common/handle_watcher.h" | 5 #include "mojo/common/handle_watcher.h" |
6 | 6 |
7 #include <map> | 7 #include <map> |
8 | 8 |
9 #include "base/atomic_sequence_num.h" | 9 #include "base/atomic_sequence_num.h" |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
11 #include "base/lazy_instance.h" | 11 #include "base/lazy_instance.h" |
12 #include "base/logging.h" | |
12 #include "base/memory/singleton.h" | 13 #include "base/memory/singleton.h" |
13 #include "base/memory/weak_ptr.h" | 14 #include "base/memory/weak_ptr.h" |
14 #include "base/message_loop/message_loop.h" | 15 #include "base/message_loop/message_loop.h" |
15 #include "base/message_loop/message_loop_proxy.h" | 16 #include "base/message_loop/message_loop_proxy.h" |
16 #include "base/synchronization/lock.h" | 17 #include "base/synchronization/lock.h" |
17 #include "base/synchronization/waitable_event.h" | 18 #include "base/synchronization/waitable_event.h" |
18 #include "base/threading/thread.h" | 19 #include "base/threading/thread.h" |
19 #include "base/threading/thread_restrictions.h" | 20 #include "base/threading/thread_restrictions.h" |
20 #include "base/time/time.h" | 21 #include "base/time/time.h" |
21 #include "mojo/common/message_pump_mojo.h" | 22 #include "mojo/common/message_pump_mojo.h" |
22 #include "mojo/common/message_pump_mojo_handler.h" | 23 #include "mojo/common/message_pump_mojo_handler.h" |
23 #include "mojo/common/time_helper.h" | 24 #include "mojo/common/time_helper.h" |
24 | 25 |
25 namespace mojo { | 26 namespace mojo { |
26 namespace common { | 27 namespace common { |
27 | 28 |
28 typedef int WatcherID; | 29 typedef int WatcherID; |
29 | 30 |
30 namespace { | 31 namespace { |
31 | 32 |
32 const char kWatcherThreadName[] = "handle-watcher-thread"; | 33 const char kWatcherThreadName[] = "handle-watcher-thread"; |
33 | 34 |
34 // TODO(sky): this should be unnecessary once MessageLoop has been refactored. | |
35 MessagePumpMojo* message_pump_mojo = NULL; | |
36 | |
37 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { | |
38 message_pump_mojo = new MessagePumpMojo; | |
39 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass(); | |
40 } | |
41 | |
42 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { | 35 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { |
43 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : | 36 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : |
44 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); | 37 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); |
45 } | 38 } |
46 | 39 |
47 // Tracks the data for a single call to Start(). | 40 // Tracks the data for a single call to Start(). |
48 struct WatchData { | 41 struct WatchData { |
49 WatchData() | 42 WatchData() |
50 : id(0), | 43 : id(0), |
51 handle_signals(MOJO_HANDLE_SIGNAL_NONE), | 44 handle_signals(MOJO_HANDLE_SIGNAL_NONE), |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
99 | 92 |
100 WatcherBackend::~WatcherBackend() { | 93 WatcherBackend::~WatcherBackend() { |
101 } | 94 } |
102 | 95 |
103 void WatcherBackend::StartWatching(const WatchData& data) { | 96 void WatcherBackend::StartWatching(const WatchData& data) { |
104 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); | 97 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); |
105 | 98 |
106 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); | 99 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); |
107 | 100 |
108 handle_to_data_[data.handle] = data; | 101 handle_to_data_[data.handle] = data; |
109 message_pump_mojo->AddHandler(this, data.handle, | 102 MessagePumpMojo::current()->AddHandler(this, data.handle, |
110 data.handle_signals, | 103 data.handle_signals, |
111 data.deadline); | 104 data.deadline); |
112 } | 105 } |
113 | 106 |
114 void WatcherBackend::StopWatching(WatcherID watcher_id) { | 107 void WatcherBackend::StopWatching(WatcherID watcher_id) { |
115 // Because of the thread hop it is entirely possible to get here and not | 108 // Because of the thread hop it is entirely possible to get here and not |
116 // have a valid handle registered for |watcher_id|. | 109 // have a valid handle registered for |watcher_id|. |
117 Handle handle; | 110 Handle handle; |
118 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { | 111 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { |
119 handle_to_data_.erase(handle); | 112 handle_to_data_.erase(handle); |
120 message_pump_mojo->RemoveHandler(handle); | 113 MessagePumpMojo::current()->RemoveHandler(handle); |
121 } | 114 } |
122 } | 115 } |
123 | 116 |
124 void WatcherBackend::RemoveAndNotify(const Handle& handle, | 117 void WatcherBackend::RemoveAndNotify(const Handle& handle, |
125 MojoResult result) { | 118 MojoResult result) { |
126 if (handle_to_data_.count(handle) == 0) | 119 if (handle_to_data_.count(handle) == 0) |
127 return; | 120 return; |
128 | 121 |
129 const WatchData data(handle_to_data_[handle]); | 122 const WatchData data(handle_to_data_[handle]); |
130 handle_to_data_.erase(handle); | 123 handle_to_data_.erase(handle); |
131 message_pump_mojo->RemoveHandler(handle); | 124 MessagePumpMojo::current()->RemoveHandler(handle); |
132 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); | 125 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); |
133 } | 126 } |
134 | 127 |
135 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, | 128 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, |
136 Handle* handle) const { | 129 Handle* handle) const { |
137 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); | 130 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); |
138 i != handle_to_data_.end(); ++i) { | 131 i != handle_to_data_.end(); ++i) { |
139 if (i->second.id == watcher_id) { | 132 if (i->second.id == watcher_id) { |
140 *handle = i->second.handle; | 133 *handle = i->second.handle; |
141 return true; | 134 return true; |
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
305 } else { | 298 } else { |
306 backend_.StopWatching(requests[i].stop_id); | 299 backend_.StopWatching(requests[i].stop_id); |
307 requests[i].stop_event->Signal(); | 300 requests[i].stop_event->Signal(); |
308 } | 301 } |
309 } | 302 } |
310 } | 303 } |
311 | 304 |
312 WatcherThreadManager::WatcherThreadManager() | 305 WatcherThreadManager::WatcherThreadManager() |
313 : thread_(kWatcherThreadName) { | 306 : thread_(kWatcherThreadName) { |
314 base::Thread::Options thread_options; | 307 base::Thread::Options thread_options; |
315 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); | 308 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); |
316 thread_.StartWithOptions(thread_options); | 309 thread_.StartWithOptions(thread_options); |
317 } | 310 } |
318 | 311 |
319 // HandleWatcher::State -------------------------------------------------------- | 312 // HandleWatcher::StateBase and subclasses ------------------------------------- |
320 | 313 |
321 // Represents the state of the HandleWatcher. Owns the user's callback and | 314 // The base class of HandleWatcher's state. Owns the user's callback and |
322 // monitors the current thread's MessageLoop to know when to force the callback | 315 // monitors the current thread's MessageLoop to know when to force the callback |
323 // to run (with an error) even though the pipe hasn't been signaled yet. | 316 // to run (with an error) even though the pipe hasn't been signaled yet. |
324 class HandleWatcher::State : public base::MessageLoop::DestructionObserver { | 317 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { |
sky
2014/08/27 21:37:27
Is the DestructionObserver code necessary for Same
yzshen1
2014/08/27 22:31:48
Thanks for the suggestion!
According to discussion
| |
325 public: | 318 public: |
326 State(HandleWatcher* watcher, | 319 StateBase(HandleWatcher* watcher, |
327 const Handle& handle, | 320 const base::Callback<void(MojoResult)>& callback) |
328 MojoHandleSignals handle_signals, | |
329 MojoDeadline deadline, | |
330 const base::Callback<void(MojoResult)>& callback) | |
331 : watcher_(watcher), | 321 : watcher_(watcher), |
332 callback_(callback), | 322 callback_(callback), |
333 got_ready_(false), | 323 got_ready_(false) { |
334 weak_factory_(this) { | |
335 base::MessageLoop::current()->AddDestructionObserver(this); | 324 base::MessageLoop::current()->AddDestructionObserver(this); |
336 | |
337 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( | |
338 handle, | |
339 handle_signals, | |
340 MojoDeadlineToTimeTicks(deadline), | |
341 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr())); | |
342 } | 325 } |
343 | 326 |
344 virtual ~State() { | 327 virtual ~StateBase() { |
345 base::MessageLoop::current()->RemoveDestructionObserver(this); | 328 base::MessageLoop::current()->RemoveDestructionObserver(this); |
329 } | |
346 | 330 |
347 // If we've been notified the handle is ready (|got_ready_| is true) then | 331 void NotifyHandleReady(MojoResult result) { |
348 // the watch has been implicitly removed by | 332 got_ready_ = true; |
349 // WatcherThreadManager/MessagePumpMojo and we don't have to call | 333 NotifyAndDestroy(result); |
350 // StopWatching(). To do so would needlessly entail posting a task and | |
351 // blocking until the background thread services it. | |
352 if (!got_ready_) | |
353 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); | |
354 } | 334 } |
355 | 335 |
336 bool got_ready() const { return got_ready_; } | |
337 | |
356 private: | 338 private: |
357 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { | 339 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { |
358 // The current thread is exiting. Simulate a watch error. | 340 // The current thread is exiting. Simulate a watch error. |
359 NotifyAndDestroy(MOJO_RESULT_ABORTED); | 341 NotifyAndDestroy(MOJO_RESULT_ABORTED); |
360 } | 342 } |
361 | 343 |
362 void OnHandleReady(MojoResult result) { | |
363 got_ready_ = true; | |
364 NotifyAndDestroy(result); | |
365 } | |
366 | |
367 void NotifyAndDestroy(MojoResult result) { | 344 void NotifyAndDestroy(MojoResult result) { |
368 base::Callback<void(MojoResult)> callback = callback_; | 345 base::Callback<void(MojoResult)> callback = callback_; |
369 watcher_->Stop(); // Destroys |this|. | 346 watcher_->Stop(); // Destroys |this|. |
370 | 347 |
371 callback.Run(result); | 348 callback.Run(result); |
372 } | 349 } |
373 | 350 |
374 HandleWatcher* watcher_; | 351 HandleWatcher* watcher_; |
375 WatcherID watcher_id_; | |
376 base::Callback<void(MojoResult)> callback_; | 352 base::Callback<void(MojoResult)> callback_; |
377 | 353 |
378 // Have we been notified that the handle is ready? | 354 // Have we been notified that the handle is ready? |
379 bool got_ready_; | 355 bool got_ready_; |
356 }; | |
sky
2014/08/27 21:37:27
DISALLOW_...
yzshen1
2014/08/27 22:31:48
Done.
| |
357 | |
358 class HandleWatcher::SameThreadWatchingState : public StateBase, | |
sky
2014/08/27 21:37:27
Add description, especially since same thread is n
| |
359 public MessagePumpMojoHandler { | |
360 public: | |
361 SameThreadWatchingState(HandleWatcher* watcher, | |
362 const Handle& handle, | |
363 MojoHandleSignals handle_signals, | |
364 MojoDeadline deadline, | |
365 const base::Callback<void(MojoResult)>& callback) | |
366 : StateBase(watcher, callback), | |
367 handle_(handle) { | |
368 DCHECK(MessagePumpMojo::IsCurrent()); | |
369 | |
370 MessagePumpMojo::current()->AddHandler( | |
371 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); | |
372 } | |
373 | |
374 virtual ~SameThreadWatchingState() { | |
375 if (!got_ready()) | |
376 MessagePumpMojo::current()->RemoveHandler(handle_); | |
377 } | |
378 | |
379 private: | |
380 // MessagePumpMojoHandler overrides: | |
381 virtual void OnHandleReady(const Handle& handle) OVERRIDE { | |
382 DCHECK_EQ(handle.value(), handle_.value()); | |
383 MessagePumpMojo::current()->RemoveHandler(handle_); | |
384 NotifyHandleReady(MOJO_RESULT_OK); | |
385 } | |
386 | |
387 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE { | |
388 DCHECK_EQ(handle.value(), handle_.value()); | |
389 MessagePumpMojo::current()->RemoveHandler(handle_); | |
390 NotifyHandleReady(result); | |
391 } | |
392 | |
393 Handle handle_; | |
394 }; | |
sky
2014/08/27 21:37:27
DISALLOW..
yzshen1
2014/08/27 22:31:48
Done.
| |
395 | |
396 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { | |
397 public: | |
398 SecondaryThreadWatchingState(HandleWatcher* watcher, | |
399 const Handle& handle, | |
400 MojoHandleSignals handle_signals, | |
401 MojoDeadline deadline, | |
402 const base::Callback<void(MojoResult)>& callback) | |
403 : StateBase(watcher, callback), | |
404 weak_factory_(this) { | |
405 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( | |
406 handle, | |
407 handle_signals, | |
408 MojoDeadlineToTimeTicks(deadline), | |
409 base::Bind(&StateBase::NotifyHandleReady, weak_factory_.GetWeakPtr())); | |
410 } | |
411 | |
412 virtual ~SecondaryThreadWatchingState() { | |
413 // If we've been notified the handle is ready (|got_ready()| is true) then | |
414 // the watch has been implicitly removed by | |
415 // WatcherThreadManager/MessagePumpMojo and we don't have to call | |
416 // StopWatching(). To do so would needlessly entail posting a task and | |
417 // blocking until the background thread services it. | |
418 if (!got_ready()) | |
419 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); | |
420 } | |
421 | |
422 private: | |
423 WatcherID watcher_id_; | |
380 | 424 |
381 // Used to weakly bind |this| to the WatcherThreadManager. | 425 // Used to weakly bind |this| to the WatcherThreadManager. |
382 base::WeakPtrFactory<State> weak_factory_; | 426 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; |
383 }; | 427 }; |
sky
2014/08/27 21:37:27
DISALLOW...
yzshen1
2014/08/27 22:31:48
Done.
| |
384 | 428 |
385 // HandleWatcher --------------------------------------------------------------- | 429 // HandleWatcher --------------------------------------------------------------- |
386 | 430 |
387 HandleWatcher::HandleWatcher() { | 431 HandleWatcher::HandleWatcher() { |
388 } | 432 } |
389 | 433 |
390 HandleWatcher::~HandleWatcher() { | 434 HandleWatcher::~HandleWatcher() { |
391 } | 435 } |
392 | 436 |
393 void HandleWatcher::Start(const Handle& handle, | 437 void HandleWatcher::Start(const Handle& handle, |
394 MojoHandleSignals handle_signals, | 438 MojoHandleSignals handle_signals, |
395 MojoDeadline deadline, | 439 MojoDeadline deadline, |
396 const base::Callback<void(MojoResult)>& callback) { | 440 const base::Callback<void(MojoResult)>& callback) { |
397 DCHECK(handle.is_valid()); | 441 DCHECK(handle.is_valid()); |
398 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); | 442 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); |
399 | 443 |
400 state_.reset(new State(this, handle, handle_signals, deadline, callback)); | 444 if (MessagePumpMojo::IsCurrent()) { |
445 state_.reset(new SameThreadWatchingState( | |
446 this, handle, handle_signals, deadline, callback)); | |
447 } else { | |
448 state_.reset(new SecondaryThreadWatchingState( | |
449 this, handle, handle_signals, deadline, callback)); | |
450 } | |
401 } | 451 } |
402 | 452 |
403 void HandleWatcher::Stop() { | 453 void HandleWatcher::Stop() { |
404 state_.reset(); | 454 state_.reset(); |
405 } | 455 } |
406 | 456 |
407 } // namespace common | 457 } // namespace common |
408 } // namespace mojo | 458 } // namespace mojo |
OLD | NEW |