Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Side by Side Diff: mojo/common/handle_watcher.cc

Issue 506353002: Make HandleWatcher watch on the same thread if the thread is running a MessagePumpMojo. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/common/handle_watcher.h" 5 #include "mojo/common/handle_watcher.h"
6 6
7 #include <map> 7 #include <map>
8 8
9 #include "base/atomic_sequence_num.h" 9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/lazy_instance.h" 11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
12 #include "base/memory/singleton.h" 13 #include "base/memory/singleton.h"
13 #include "base/memory/weak_ptr.h" 14 #include "base/memory/weak_ptr.h"
14 #include "base/message_loop/message_loop.h" 15 #include "base/message_loop/message_loop.h"
15 #include "base/message_loop/message_loop_proxy.h" 16 #include "base/message_loop/message_loop_proxy.h"
16 #include "base/synchronization/lock.h" 17 #include "base/synchronization/lock.h"
17 #include "base/synchronization/waitable_event.h" 18 #include "base/synchronization/waitable_event.h"
18 #include "base/threading/thread.h" 19 #include "base/threading/thread.h"
19 #include "base/threading/thread_restrictions.h" 20 #include "base/threading/thread_restrictions.h"
20 #include "base/time/time.h" 21 #include "base/time/time.h"
21 #include "mojo/common/message_pump_mojo.h" 22 #include "mojo/common/message_pump_mojo.h"
22 #include "mojo/common/message_pump_mojo_handler.h" 23 #include "mojo/common/message_pump_mojo_handler.h"
23 #include "mojo/common/time_helper.h" 24 #include "mojo/common/time_helper.h"
24 25
25 namespace mojo { 26 namespace mojo {
26 namespace common { 27 namespace common {
27 28
28 typedef int WatcherID; 29 typedef int WatcherID;
29 30
30 namespace { 31 namespace {
31 32
32 const char kWatcherThreadName[] = "handle-watcher-thread"; 33 const char kWatcherThreadName[] = "handle-watcher-thread";
33 34
34 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
35 MessagePumpMojo* message_pump_mojo = NULL;
36
37 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
38 message_pump_mojo = new MessagePumpMojo;
39 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
40 }
41
42 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { 35 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
43 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : 36 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
44 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); 37 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
45 } 38 }
46 39
47 // Tracks the data for a single call to Start(). 40 // Tracks the data for a single call to Start().
48 struct WatchData { 41 struct WatchData {
49 WatchData() 42 WatchData()
50 : id(0), 43 : id(0),
51 handle_signals(MOJO_HANDLE_SIGNAL_NONE), 44 handle_signals(MOJO_HANDLE_SIGNAL_NONE),
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 92
100 WatcherBackend::~WatcherBackend() { 93 WatcherBackend::~WatcherBackend() {
101 } 94 }
102 95
103 void WatcherBackend::StartWatching(const WatchData& data) { 96 void WatcherBackend::StartWatching(const WatchData& data) {
104 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); 97 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
105 98
106 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); 99 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
107 100
108 handle_to_data_[data.handle] = data; 101 handle_to_data_[data.handle] = data;
109 message_pump_mojo->AddHandler(this, data.handle, 102 MessagePumpMojo::current()->AddHandler(this, data.handle,
110 data.handle_signals, 103 data.handle_signals,
111 data.deadline); 104 data.deadline);
112 } 105 }
113 106
114 void WatcherBackend::StopWatching(WatcherID watcher_id) { 107 void WatcherBackend::StopWatching(WatcherID watcher_id) {
115 // Because of the thread hop it is entirely possible to get here and not 108 // Because of the thread hop it is entirely possible to get here and not
116 // have a valid handle registered for |watcher_id|. 109 // have a valid handle registered for |watcher_id|.
117 Handle handle; 110 Handle handle;
118 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { 111 if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
119 handle_to_data_.erase(handle); 112 handle_to_data_.erase(handle);
120 message_pump_mojo->RemoveHandler(handle); 113 MessagePumpMojo::current()->RemoveHandler(handle);
121 } 114 }
122 } 115 }
123 116
124 void WatcherBackend::RemoveAndNotify(const Handle& handle, 117 void WatcherBackend::RemoveAndNotify(const Handle& handle,
125 MojoResult result) { 118 MojoResult result) {
126 if (handle_to_data_.count(handle) == 0) 119 if (handle_to_data_.count(handle) == 0)
127 return; 120 return;
128 121
129 const WatchData data(handle_to_data_[handle]); 122 const WatchData data(handle_to_data_[handle]);
130 handle_to_data_.erase(handle); 123 handle_to_data_.erase(handle);
131 message_pump_mojo->RemoveHandler(handle); 124 MessagePumpMojo::current()->RemoveHandler(handle);
132 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); 125 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
133 } 126 }
134 127
135 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, 128 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
136 Handle* handle) const { 129 Handle* handle) const {
137 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); 130 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
138 i != handle_to_data_.end(); ++i) { 131 i != handle_to_data_.end(); ++i) {
139 if (i->second.id == watcher_id) { 132 if (i->second.id == watcher_id) {
140 *handle = i->second.handle; 133 *handle = i->second.handle;
141 return true; 134 return true;
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after
305 } else { 298 } else {
306 backend_.StopWatching(requests[i].stop_id); 299 backend_.StopWatching(requests[i].stop_id);
307 requests[i].stop_event->Signal(); 300 requests[i].stop_event->Signal();
308 } 301 }
309 } 302 }
310 } 303 }
311 304
312 WatcherThreadManager::WatcherThreadManager() 305 WatcherThreadManager::WatcherThreadManager()
313 : thread_(kWatcherThreadName) { 306 : thread_(kWatcherThreadName) {
314 base::Thread::Options thread_options; 307 base::Thread::Options thread_options;
315 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); 308 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
316 thread_.StartWithOptions(thread_options); 309 thread_.StartWithOptions(thread_options);
317 } 310 }
318 311
319 // HandleWatcher::State -------------------------------------------------------- 312 // HandleWatcher::StateBase and subclasses -------------------------------------
320 313
321 // Represents the state of the HandleWatcher. Owns the user's callback and 314 // The base class of HandleWatcher's state. Owns the user's callback and
322 // monitors the current thread's MessageLoop to know when to force the callback 315 // monitors the current thread's MessageLoop to know when to force the callback
323 // to run (with an error) even though the pipe hasn't been signaled yet. 316 // to run (with an error) even though the pipe hasn't been signaled yet.
324 class HandleWatcher::State : public base::MessageLoop::DestructionObserver { 317 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
sky 2014/08/27 21:37:27 Is the DestructionObserver code necessary for Same
yzshen1 2014/08/27 22:31:48 Thanks for the suggestion! According to discussion
325 public: 318 public:
326 State(HandleWatcher* watcher, 319 StateBase(HandleWatcher* watcher,
327 const Handle& handle, 320 const base::Callback<void(MojoResult)>& callback)
328 MojoHandleSignals handle_signals,
329 MojoDeadline deadline,
330 const base::Callback<void(MojoResult)>& callback)
331 : watcher_(watcher), 321 : watcher_(watcher),
332 callback_(callback), 322 callback_(callback),
333 got_ready_(false), 323 got_ready_(false) {
334 weak_factory_(this) {
335 base::MessageLoop::current()->AddDestructionObserver(this); 324 base::MessageLoop::current()->AddDestructionObserver(this);
336
337 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
338 handle,
339 handle_signals,
340 MojoDeadlineToTimeTicks(deadline),
341 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
342 } 325 }
343 326
344 virtual ~State() { 327 virtual ~StateBase() {
345 base::MessageLoop::current()->RemoveDestructionObserver(this); 328 base::MessageLoop::current()->RemoveDestructionObserver(this);
329 }
346 330
347 // If we've been notified the handle is ready (|got_ready_| is true) then 331 void NotifyHandleReady(MojoResult result) {
348 // the watch has been implicitly removed by 332 got_ready_ = true;
349 // WatcherThreadManager/MessagePumpMojo and we don't have to call 333 NotifyAndDestroy(result);
350 // StopWatching(). To do so would needlessly entail posting a task and
351 // blocking until the background thread services it.
352 if (!got_ready_)
353 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
354 } 334 }
355 335
336 bool got_ready() const { return got_ready_; }
337
356 private: 338 private:
357 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { 339 virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
358 // The current thread is exiting. Simulate a watch error. 340 // The current thread is exiting. Simulate a watch error.
359 NotifyAndDestroy(MOJO_RESULT_ABORTED); 341 NotifyAndDestroy(MOJO_RESULT_ABORTED);
360 } 342 }
361 343
362 void OnHandleReady(MojoResult result) {
363 got_ready_ = true;
364 NotifyAndDestroy(result);
365 }
366
367 void NotifyAndDestroy(MojoResult result) { 344 void NotifyAndDestroy(MojoResult result) {
368 base::Callback<void(MojoResult)> callback = callback_; 345 base::Callback<void(MojoResult)> callback = callback_;
369 watcher_->Stop(); // Destroys |this|. 346 watcher_->Stop(); // Destroys |this|.
370 347
371 callback.Run(result); 348 callback.Run(result);
372 } 349 }
373 350
374 HandleWatcher* watcher_; 351 HandleWatcher* watcher_;
375 WatcherID watcher_id_;
376 base::Callback<void(MojoResult)> callback_; 352 base::Callback<void(MojoResult)> callback_;
377 353
378 // Have we been notified that the handle is ready? 354 // Have we been notified that the handle is ready?
379 bool got_ready_; 355 bool got_ready_;
356 };
sky 2014/08/27 21:37:27 DISALLOW_...
yzshen1 2014/08/27 22:31:48 Done.
357
358 class HandleWatcher::SameThreadWatchingState : public StateBase,
sky 2014/08/27 21:37:27 Add description, especially since same thread is n
359 public MessagePumpMojoHandler {
360 public:
361 SameThreadWatchingState(HandleWatcher* watcher,
362 const Handle& handle,
363 MojoHandleSignals handle_signals,
364 MojoDeadline deadline,
365 const base::Callback<void(MojoResult)>& callback)
366 : StateBase(watcher, callback),
367 handle_(handle) {
368 DCHECK(MessagePumpMojo::IsCurrent());
369
370 MessagePumpMojo::current()->AddHandler(
371 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
372 }
373
374 virtual ~SameThreadWatchingState() {
375 if (!got_ready())
376 MessagePumpMojo::current()->RemoveHandler(handle_);
377 }
378
379 private:
380 // MessagePumpMojoHandler overrides:
381 virtual void OnHandleReady(const Handle& handle) OVERRIDE {
382 DCHECK_EQ(handle.value(), handle_.value());
383 MessagePumpMojo::current()->RemoveHandler(handle_);
384 NotifyHandleReady(MOJO_RESULT_OK);
385 }
386
387 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE {
388 DCHECK_EQ(handle.value(), handle_.value());
389 MessagePumpMojo::current()->RemoveHandler(handle_);
390 NotifyHandleReady(result);
391 }
392
393 Handle handle_;
394 };
sky 2014/08/27 21:37:27 DISALLOW..
yzshen1 2014/08/27 22:31:48 Done.
395
396 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
397 public:
398 SecondaryThreadWatchingState(HandleWatcher* watcher,
399 const Handle& handle,
400 MojoHandleSignals handle_signals,
401 MojoDeadline deadline,
402 const base::Callback<void(MojoResult)>& callback)
403 : StateBase(watcher, callback),
404 weak_factory_(this) {
405 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
406 handle,
407 handle_signals,
408 MojoDeadlineToTimeTicks(deadline),
409 base::Bind(&StateBase::NotifyHandleReady, weak_factory_.GetWeakPtr()));
410 }
411
412 virtual ~SecondaryThreadWatchingState() {
413 // If we've been notified the handle is ready (|got_ready()| is true) then
414 // the watch has been implicitly removed by
415 // WatcherThreadManager/MessagePumpMojo and we don't have to call
416 // StopWatching(). To do so would needlessly entail posting a task and
417 // blocking until the background thread services it.
418 if (!got_ready())
419 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
420 }
421
422 private:
423 WatcherID watcher_id_;
380 424
381 // Used to weakly bind |this| to the WatcherThreadManager. 425 // Used to weakly bind |this| to the WatcherThreadManager.
382 base::WeakPtrFactory<State> weak_factory_; 426 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
383 }; 427 };
sky 2014/08/27 21:37:27 DISALLOW...
yzshen1 2014/08/27 22:31:48 Done.
384 428
385 // HandleWatcher --------------------------------------------------------------- 429 // HandleWatcher ---------------------------------------------------------------
386 430
387 HandleWatcher::HandleWatcher() { 431 HandleWatcher::HandleWatcher() {
388 } 432 }
389 433
390 HandleWatcher::~HandleWatcher() { 434 HandleWatcher::~HandleWatcher() {
391 } 435 }
392 436
393 void HandleWatcher::Start(const Handle& handle, 437 void HandleWatcher::Start(const Handle& handle,
394 MojoHandleSignals handle_signals, 438 MojoHandleSignals handle_signals,
395 MojoDeadline deadline, 439 MojoDeadline deadline,
396 const base::Callback<void(MojoResult)>& callback) { 440 const base::Callback<void(MojoResult)>& callback) {
397 DCHECK(handle.is_valid()); 441 DCHECK(handle.is_valid());
398 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); 442 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
399 443
400 state_.reset(new State(this, handle, handle_signals, deadline, callback)); 444 if (MessagePumpMojo::IsCurrent()) {
445 state_.reset(new SameThreadWatchingState(
446 this, handle, handle_signals, deadline, callback));
447 } else {
448 state_.reset(new SecondaryThreadWatchingState(
449 this, handle, handle_signals, deadline, callback));
450 }
401 } 451 }
402 452
403 void HandleWatcher::Stop() { 453 void HandleWatcher::Stop() {
404 state_.reset(); 454 state_.reset();
405 } 455 }
406 456
407 } // namespace common 457 } // namespace common
408 } // namespace mojo 458 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698