Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(160)

Side by Side Diff: mojo/common/handle_watcher.cc

Issue 506353002: Make HandleWatcher watch on the same thread if the thread is running a MessagePumpMojo. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: updated unittests Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/common/handle_watcher.h" 5 #include "mojo/common/handle_watcher.h"
6 6
7 #include <map> 7 #include <map>
8 8
9 #include "base/atomic_sequence_num.h" 9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/lazy_instance.h" 11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
12 #include "base/memory/singleton.h" 14 #include "base/memory/singleton.h"
13 #include "base/memory/weak_ptr.h" 15 #include "base/memory/weak_ptr.h"
14 #include "base/message_loop/message_loop.h" 16 #include "base/message_loop/message_loop.h"
15 #include "base/message_loop/message_loop_proxy.h" 17 #include "base/message_loop/message_loop_proxy.h"
16 #include "base/synchronization/lock.h" 18 #include "base/synchronization/lock.h"
17 #include "base/synchronization/waitable_event.h" 19 #include "base/synchronization/waitable_event.h"
18 #include "base/threading/thread.h" 20 #include "base/threading/thread.h"
19 #include "base/threading/thread_restrictions.h" 21 #include "base/threading/thread_restrictions.h"
20 #include "base/time/time.h" 22 #include "base/time/time.h"
21 #include "mojo/common/message_pump_mojo.h" 23 #include "mojo/common/message_pump_mojo.h"
22 #include "mojo/common/message_pump_mojo_handler.h" 24 #include "mojo/common/message_pump_mojo_handler.h"
23 #include "mojo/common/time_helper.h" 25 #include "mojo/common/time_helper.h"
24 26
25 namespace mojo { 27 namespace mojo {
26 namespace common { 28 namespace common {
27 29
28 typedef int WatcherID; 30 typedef int WatcherID;
29 31
30 namespace { 32 namespace {
31 33
32 const char kWatcherThreadName[] = "handle-watcher-thread"; 34 const char kWatcherThreadName[] = "handle-watcher-thread";
33 35
34 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
35 MessagePumpMojo* message_pump_mojo = NULL;
36
37 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
38 message_pump_mojo = new MessagePumpMojo;
39 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
40 }
41
42 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { 36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
43 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : 37 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
44 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); 38 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
45 } 39 }
46 40
47 // Tracks the data for a single call to Start(). 41 // Tracks the data for a single call to Start().
48 struct WatchData { 42 struct WatchData {
49 WatchData() 43 WatchData()
50 : id(0), 44 : id(0),
51 handle_signals(MOJO_HANDLE_SIGNAL_NONE), 45 handle_signals(MOJO_HANDLE_SIGNAL_NONE),
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 93
100 WatcherBackend::~WatcherBackend() { 94 WatcherBackend::~WatcherBackend() {
101 } 95 }
102 96
103 void WatcherBackend::StartWatching(const WatchData& data) { 97 void WatcherBackend::StartWatching(const WatchData& data) {
104 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); 98 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
105 99
106 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); 100 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
107 101
108 handle_to_data_[data.handle] = data; 102 handle_to_data_[data.handle] = data;
109 message_pump_mojo->AddHandler(this, data.handle, 103 MessagePumpMojo::current()->AddHandler(this, data.handle,
110 data.handle_signals, 104 data.handle_signals,
111 data.deadline); 105 data.deadline);
112 } 106 }
113 107
114 void WatcherBackend::StopWatching(WatcherID watcher_id) { 108 void WatcherBackend::StopWatching(WatcherID watcher_id) {
115 // Because of the thread hop it is entirely possible to get here and not 109 // Because of the thread hop it is entirely possible to get here and not
116 // have a valid handle registered for |watcher_id|. 110 // have a valid handle registered for |watcher_id|.
117 Handle handle; 111 Handle handle;
118 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { 112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
119 handle_to_data_.erase(handle); 113 handle_to_data_.erase(handle);
120 message_pump_mojo->RemoveHandler(handle); 114 MessagePumpMojo::current()->RemoveHandler(handle);
121 } 115 }
122 } 116 }
123 117
124 void WatcherBackend::RemoveAndNotify(const Handle& handle, 118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
125 MojoResult result) { 119 MojoResult result) {
126 if (handle_to_data_.count(handle) == 0) 120 if (handle_to_data_.count(handle) == 0)
127 return; 121 return;
128 122
129 const WatchData data(handle_to_data_[handle]); 123 const WatchData data(handle_to_data_[handle]);
130 handle_to_data_.erase(handle); 124 handle_to_data_.erase(handle);
131 message_pump_mojo->RemoveHandler(handle); 125 MessagePumpMojo::current()->RemoveHandler(handle);
132 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); 126 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
133 } 127 }
134 128
135 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, 129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
136 Handle* handle) const { 130 Handle* handle) const {
137 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); 131 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
138 i != handle_to_data_.end(); ++i) { 132 i != handle_to_data_.end(); ++i) {
139 if (i->second.id == watcher_id) { 133 if (i->second.id == watcher_id) {
140 *handle = i->second.handle; 134 *handle = i->second.handle;
141 return true; 135 return true;
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after
305 } else { 299 } else {
306 backend_.StopWatching(requests[i].stop_id); 300 backend_.StopWatching(requests[i].stop_id);
307 requests[i].stop_event->Signal(); 301 requests[i].stop_event->Signal();
308 } 302 }
309 } 303 }
310 } 304 }
311 305
312 WatcherThreadManager::WatcherThreadManager() 306 WatcherThreadManager::WatcherThreadManager()
313 : thread_(kWatcherThreadName) { 307 : thread_(kWatcherThreadName) {
314 base::Thread::Options thread_options; 308 base::Thread::Options thread_options;
315 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); 309 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
316 thread_.StartWithOptions(thread_options); 310 thread_.StartWithOptions(thread_options);
317 } 311 }
318 312
319 // HandleWatcher::State -------------------------------------------------------- 313 // HandleWatcher::StateBase and subclasses -------------------------------------
320 314
321 // Represents the state of the HandleWatcher. Owns the user's callback and 315 // The base class of HandleWatcher's state. Owns the user's callback and
322 // monitors the current thread's MessageLoop to know when to force the callback 316 // monitors the current thread's MessageLoop to know when to force the callback
323 // to run (with an error) even though the pipe hasn't been signaled yet. 317 // to run (with an error) even though the pipe hasn't been signaled yet.
324 class HandleWatcher::State : public base::MessageLoop::DestructionObserver { 318 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
325 public: 319 public:
326 State(HandleWatcher* watcher, 320 StateBase(HandleWatcher* watcher,
327 const Handle& handle, 321 const base::Callback<void(MojoResult)>& callback)
328 MojoHandleSignals handle_signals,
329 MojoDeadline deadline,
330 const base::Callback<void(MojoResult)>& callback)
331 : watcher_(watcher), 322 : watcher_(watcher),
332 callback_(callback), 323 callback_(callback),
333 got_ready_(false), 324 got_ready_(false) {
334 weak_factory_(this) {
335 base::MessageLoop::current()->AddDestructionObserver(this); 325 base::MessageLoop::current()->AddDestructionObserver(this);
336
337 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
338 handle,
339 handle_signals,
340 MojoDeadlineToTimeTicks(deadline),
341 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
342 } 326 }
343 327
344 virtual ~State() { 328 virtual ~StateBase() {
345 base::MessageLoop::current()->RemoveDestructionObserver(this); 329 base::MessageLoop::current()->RemoveDestructionObserver(this);
330 }
346 331
347 // If we've been notified the handle is ready (|got_ready_| is true) then 332 protected:
348 // the watch has been implicitly removed by 333 void NotifyHandleReady(MojoResult result) {
349 // WatcherThreadManager/MessagePumpMojo and we don't have to call 334 got_ready_ = true;
350 // StopWatching(). To do so would needlessly entail posting a task and 335 NotifyAndDestroy(result);
351 // blocking until the background thread services it.
352 if (!got_ready_)
353 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
354 } 336 }
355 337
338 bool got_ready() const { return got_ready_; }
339
356 private: 340 private:
357 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { 341 virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
358 // The current thread is exiting. Simulate a watch error. 342 // The current thread is exiting. Simulate a watch error.
359 NotifyAndDestroy(MOJO_RESULT_ABORTED); 343 NotifyAndDestroy(MOJO_RESULT_ABORTED);
360 } 344 }
361 345
362 void OnHandleReady(MojoResult result) {
363 got_ready_ = true;
364 NotifyAndDestroy(result);
365 }
366
367 void NotifyAndDestroy(MojoResult result) { 346 void NotifyAndDestroy(MojoResult result) {
368 base::Callback<void(MojoResult)> callback = callback_; 347 base::Callback<void(MojoResult)> callback = callback_;
369 watcher_->Stop(); // Destroys |this|. 348 watcher_->Stop(); // Destroys |this|.
370 349
371 callback.Run(result); 350 callback.Run(result);
372 } 351 }
373 352
374 HandleWatcher* watcher_; 353 HandleWatcher* watcher_;
375 WatcherID watcher_id_;
376 base::Callback<void(MojoResult)> callback_; 354 base::Callback<void(MojoResult)> callback_;
377 355
378 // Have we been notified that the handle is ready? 356 // Have we been notified that the handle is ready?
379 bool got_ready_; 357 bool got_ready_;
380 358
359 DISALLOW_COPY_AND_ASSIGN(StateBase);
360 };
361
362 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
363 // SameThreadWatchingState is used to directly watch the handle on the same
364 // thread.
365 class HandleWatcher::SameThreadWatchingState : public StateBase,
366 public MessagePumpMojoHandler {
367 public:
368 SameThreadWatchingState(HandleWatcher* watcher,
369 const Handle& handle,
370 MojoHandleSignals handle_signals,
371 MojoDeadline deadline,
372 const base::Callback<void(MojoResult)>& callback)
373 : StateBase(watcher, callback),
374 handle_(handle) {
375 DCHECK(MessagePumpMojo::IsCurrent());
376
377 MessagePumpMojo::current()->AddHandler(
378 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
379 }
380
381 virtual ~SameThreadWatchingState() {
382 if (!got_ready())
383 MessagePumpMojo::current()->RemoveHandler(handle_);
384 }
385
386 private:
387 // MessagePumpMojoHandler overrides:
388 virtual void OnHandleReady(const Handle& handle) OVERRIDE {
389 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
390 }
391
392 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE {
393 StopWatchingAndNotifyReady(handle, result);
394 }
395
396 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
397 DCHECK_EQ(handle.value(), handle_.value());
398 MessagePumpMojo::current()->RemoveHandler(handle_);
399 NotifyHandleReady(result);
400 }
401
402 Handle handle_;
403
404 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
405 };
406
407 // If the thread on which HandleWatcher is used runs a message pump different
408 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
409 // handle on the handle watcher thread.
410 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
411 public:
412 SecondaryThreadWatchingState(HandleWatcher* watcher,
413 const Handle& handle,
414 MojoHandleSignals handle_signals,
415 MojoDeadline deadline,
416 const base::Callback<void(MojoResult)>& callback)
417 : StateBase(watcher, callback),
418 weak_factory_(this) {
419 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
420 handle,
421 handle_signals,
422 MojoDeadlineToTimeTicks(deadline),
423 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
424 weak_factory_.GetWeakPtr()));
425 }
426
427 virtual ~SecondaryThreadWatchingState() {
428 // If we've been notified the handle is ready (|got_ready()| is true) then
429 // the watch has been implicitly removed by
430 // WatcherThreadManager/MessagePumpMojo and we don't have to call
431 // StopWatching(). To do so would needlessly entail posting a task and
432 // blocking until the background thread services it.
433 if (!got_ready())
434 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
435 }
436
437 private:
438 WatcherID watcher_id_;
439
381 // Used to weakly bind |this| to the WatcherThreadManager. 440 // Used to weakly bind |this| to the WatcherThreadManager.
382 base::WeakPtrFactory<State> weak_factory_; 441 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
442
443 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
383 }; 444 };
384 445
385 // HandleWatcher --------------------------------------------------------------- 446 // HandleWatcher ---------------------------------------------------------------
386 447
387 HandleWatcher::HandleWatcher() { 448 HandleWatcher::HandleWatcher() {
388 } 449 }
389 450
390 HandleWatcher::~HandleWatcher() { 451 HandleWatcher::~HandleWatcher() {
391 } 452 }
392 453
393 void HandleWatcher::Start(const Handle& handle, 454 void HandleWatcher::Start(const Handle& handle,
394 MojoHandleSignals handle_signals, 455 MojoHandleSignals handle_signals,
395 MojoDeadline deadline, 456 MojoDeadline deadline,
396 const base::Callback<void(MojoResult)>& callback) { 457 const base::Callback<void(MojoResult)>& callback) {
397 DCHECK(handle.is_valid()); 458 DCHECK(handle.is_valid());
398 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); 459 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
399 460
400 state_.reset(new State(this, handle, handle_signals, deadline, callback)); 461 if (MessagePumpMojo::IsCurrent()) {
462 state_.reset(new SameThreadWatchingState(
463 this, handle, handle_signals, deadline, callback));
464 } else {
465 state_.reset(new SecondaryThreadWatchingState(
466 this, handle, handle_signals, deadline, callback));
467 }
401 } 468 }
402 469
403 void HandleWatcher::Stop() { 470 void HandleWatcher::Stop() {
404 state_.reset(); 471 state_.reset();
405 } 472 }
406 473
407 } // namespace common 474 } // namespace common
408 } // namespace mojo 475 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698