Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(297)

Side by Side Diff: mojo/common/handle_watcher.cc

Issue 506353002: Make HandleWatcher watch on the same thread if the thread is running a MessagePumpMojo. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/common/handle_watcher.h ('k') | mojo/common/message_pump_mojo.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/common/handle_watcher.h" 5 #include "mojo/common/handle_watcher.h"
6 6
7 #include <map> 7 #include <map>
8 8
9 #include "base/atomic_sequence_num.h" 9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/lazy_instance.h" 11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
12 #include "base/memory/singleton.h" 14 #include "base/memory/singleton.h"
13 #include "base/memory/weak_ptr.h" 15 #include "base/memory/weak_ptr.h"
14 #include "base/message_loop/message_loop.h" 16 #include "base/message_loop/message_loop.h"
15 #include "base/message_loop/message_loop_proxy.h" 17 #include "base/message_loop/message_loop_proxy.h"
16 #include "base/synchronization/lock.h" 18 #include "base/synchronization/lock.h"
17 #include "base/synchronization/waitable_event.h" 19 #include "base/synchronization/waitable_event.h"
18 #include "base/threading/thread.h" 20 #include "base/threading/thread.h"
19 #include "base/threading/thread_restrictions.h" 21 #include "base/threading/thread_restrictions.h"
20 #include "base/time/time.h" 22 #include "base/time/time.h"
21 #include "mojo/common/message_pump_mojo.h" 23 #include "mojo/common/message_pump_mojo.h"
22 #include "mojo/common/message_pump_mojo_handler.h" 24 #include "mojo/common/message_pump_mojo_handler.h"
23 #include "mojo/common/time_helper.h" 25 #include "mojo/common/time_helper.h"
24 26
25 namespace mojo { 27 namespace mojo {
26 namespace common { 28 namespace common {
27 29
28 typedef int WatcherID; 30 typedef int WatcherID;
29 31
30 namespace { 32 namespace {
31 33
32 const char kWatcherThreadName[] = "handle-watcher-thread"; 34 const char kWatcherThreadName[] = "handle-watcher-thread";
33 35
34 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
35 MessagePumpMojo* message_pump_mojo = NULL;
36
37 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
38 message_pump_mojo = new MessagePumpMojo;
39 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
40 }
41
42 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { 36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
43 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : 37 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
44 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); 38 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
45 } 39 }
46 40
47 // Tracks the data for a single call to Start(). 41 // Tracks the data for a single call to Start().
48 struct WatchData { 42 struct WatchData {
49 WatchData() 43 WatchData()
50 : id(0), 44 : id(0),
51 handle_signals(MOJO_HANDLE_SIGNAL_NONE), 45 handle_signals(MOJO_HANDLE_SIGNAL_NONE),
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 93
100 WatcherBackend::~WatcherBackend() { 94 WatcherBackend::~WatcherBackend() {
101 } 95 }
102 96
103 void WatcherBackend::StartWatching(const WatchData& data) { 97 void WatcherBackend::StartWatching(const WatchData& data) {
104 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); 98 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
105 99
106 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); 100 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
107 101
108 handle_to_data_[data.handle] = data; 102 handle_to_data_[data.handle] = data;
109 message_pump_mojo->AddHandler(this, data.handle, 103 MessagePumpMojo::current()->AddHandler(this, data.handle,
110 data.handle_signals, 104 data.handle_signals,
111 data.deadline); 105 data.deadline);
112 } 106 }
113 107
114 void WatcherBackend::StopWatching(WatcherID watcher_id) { 108 void WatcherBackend::StopWatching(WatcherID watcher_id) {
115 // Because of the thread hop it is entirely possible to get here and not 109 // Because of the thread hop it is entirely possible to get here and not
116 // have a valid handle registered for |watcher_id|. 110 // have a valid handle registered for |watcher_id|.
117 Handle handle; 111 Handle handle;
118 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { 112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
119 handle_to_data_.erase(handle); 113 handle_to_data_.erase(handle);
120 message_pump_mojo->RemoveHandler(handle); 114 MessagePumpMojo::current()->RemoveHandler(handle);
121 } 115 }
122 } 116 }
123 117
124 void WatcherBackend::RemoveAndNotify(const Handle& handle, 118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
125 MojoResult result) { 119 MojoResult result) {
126 if (handle_to_data_.count(handle) == 0) 120 if (handle_to_data_.count(handle) == 0)
127 return; 121 return;
128 122
129 const WatchData data(handle_to_data_[handle]); 123 const WatchData data(handle_to_data_[handle]);
130 handle_to_data_.erase(handle); 124 handle_to_data_.erase(handle);
131 message_pump_mojo->RemoveHandler(handle); 125 MessagePumpMojo::current()->RemoveHandler(handle);
132 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); 126 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
133 } 127 }
134 128
135 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, 129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
136 Handle* handle) const { 130 Handle* handle) const {
137 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); 131 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
138 i != handle_to_data_.end(); ++i) { 132 i != handle_to_data_.end(); ++i) {
139 if (i->second.id == watcher_id) { 133 if (i->second.id == watcher_id) {
140 *handle = i->second.handle; 134 *handle = i->second.handle;
141 return true; 135 return true;
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after
305 } else { 299 } else {
306 backend_.StopWatching(requests[i].stop_id); 300 backend_.StopWatching(requests[i].stop_id);
307 requests[i].stop_event->Signal(); 301 requests[i].stop_event->Signal();
308 } 302 }
309 } 303 }
310 } 304 }
311 305
312 WatcherThreadManager::WatcherThreadManager() 306 WatcherThreadManager::WatcherThreadManager()
313 : thread_(kWatcherThreadName) { 307 : thread_(kWatcherThreadName) {
314 base::Thread::Options thread_options; 308 base::Thread::Options thread_options;
315 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); 309 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
316 thread_.StartWithOptions(thread_options); 310 thread_.StartWithOptions(thread_options);
317 } 311 }
318 312
319 // HandleWatcher::State -------------------------------------------------------- 313 // HandleWatcher::StateBase and subclasses -------------------------------------
320 314
321 // Represents the state of the HandleWatcher. Owns the user's callback and 315 // The base class of HandleWatcher's state. Owns the user's callback and
322 // monitors the current thread's MessageLoop to know when to force the callback 316 // monitors the current thread's MessageLoop to know when to force the callback
323 // to run (with an error) even though the pipe hasn't been signaled yet. 317 // to run (with an error) even though the pipe hasn't been signaled yet.
324 class HandleWatcher::State : public base::MessageLoop::DestructionObserver { 318 //
319 // TODO(yzshen): Very soon MessagePumpMojo will be changed to notify all handles
yzshen1 2014/08/28 19:26:05 (Actually we don't have such a plan, so removed th
320 // on its destruction. When that is landed, we shouldn't need the
321 // DestructionObserver for SameThreadWatchingState.
322 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
325 public: 323 public:
326 State(HandleWatcher* watcher, 324 StateBase(HandleWatcher* watcher,
327 const Handle& handle, 325 const base::Callback<void(MojoResult)>& callback)
328 MojoHandleSignals handle_signals,
329 MojoDeadline deadline,
330 const base::Callback<void(MojoResult)>& callback)
331 : watcher_(watcher), 326 : watcher_(watcher),
332 callback_(callback), 327 callback_(callback),
333 got_ready_(false), 328 got_ready_(false) {
334 weak_factory_(this) {
335 base::MessageLoop::current()->AddDestructionObserver(this); 329 base::MessageLoop::current()->AddDestructionObserver(this);
336
337 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
338 handle,
339 handle_signals,
340 MojoDeadlineToTimeTicks(deadline),
341 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
342 } 330 }
343 331
344 virtual ~State() { 332 virtual ~StateBase() {
345 base::MessageLoop::current()->RemoveDestructionObserver(this); 333 base::MessageLoop::current()->RemoveDestructionObserver(this);
334 }
346 335
347 // If we've been notified the handle is ready (|got_ready_| is true) then 336 void NotifyHandleReady(MojoResult result) {
sky 2014/08/27 22:49:45 nit: make this (and got_ready()) protected.
yzshen1 2014/08/28 19:26:05 Done.
348 // the watch has been implicitly removed by 337 got_ready_ = true;
349 // WatcherThreadManager/MessagePumpMojo and we don't have to call 338 NotifyAndDestroy(result);
350 // StopWatching(). To do so would needlessly entail posting a task and
351 // blocking until the background thread services it.
352 if (!got_ready_)
353 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
354 } 339 }
355 340
341 bool got_ready() const { return got_ready_; }
342
356 private: 343 private:
357 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { 344 virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
358 // The current thread is exiting. Simulate a watch error. 345 // The current thread is exiting. Simulate a watch error.
359 NotifyAndDestroy(MOJO_RESULT_ABORTED); 346 NotifyAndDestroy(MOJO_RESULT_ABORTED);
360 } 347 }
361 348
362 void OnHandleReady(MojoResult result) {
363 got_ready_ = true;
364 NotifyAndDestroy(result);
365 }
366
367 void NotifyAndDestroy(MojoResult result) { 349 void NotifyAndDestroy(MojoResult result) {
368 base::Callback<void(MojoResult)> callback = callback_; 350 base::Callback<void(MojoResult)> callback = callback_;
369 watcher_->Stop(); // Destroys |this|. 351 watcher_->Stop(); // Destroys |this|.
370 352
371 callback.Run(result); 353 callback.Run(result);
372 } 354 }
373 355
374 HandleWatcher* watcher_; 356 HandleWatcher* watcher_;
375 WatcherID watcher_id_;
376 base::Callback<void(MojoResult)> callback_; 357 base::Callback<void(MojoResult)> callback_;
377 358
378 // Have we been notified that the handle is ready? 359 // Have we been notified that the handle is ready?
379 bool got_ready_; 360 bool got_ready_;
380 361
362 DISALLOW_COPY_AND_ASSIGN(StateBase);
363 };
364
365 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
366 // SameThreadWatchingState is used to directly watch the handle on the same
367 // thread.
368 class HandleWatcher::SameThreadWatchingState : public StateBase,
369 public MessagePumpMojoHandler {
370 public:
371 SameThreadWatchingState(HandleWatcher* watcher,
372 const Handle& handle,
373 MojoHandleSignals handle_signals,
374 MojoDeadline deadline,
375 const base::Callback<void(MojoResult)>& callback)
376 : StateBase(watcher, callback),
377 handle_(handle) {
378 DCHECK(MessagePumpMojo::IsCurrent());
379
380 MessagePumpMojo::current()->AddHandler(
381 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
382 }
383
384 virtual ~SameThreadWatchingState() {
385 if (!got_ready())
386 MessagePumpMojo::current()->RemoveHandler(handle_);
387 }
388
389 private:
390 // MessagePumpMojoHandler overrides:
391 virtual void OnHandleReady(const Handle& handle) OVERRIDE {
392 DCHECK_EQ(handle.value(), handle_.value());
sky 2014/08/27 22:49:45 nit: refactor this and OnHandleError into a common
yzshen1 2014/08/28 19:26:05 Done.
393 MessagePumpMojo::current()->RemoveHandler(handle_);
394 NotifyHandleReady(MOJO_RESULT_OK);
395 }
396
397 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE {
398 DCHECK_EQ(handle.value(), handle_.value());
399 MessagePumpMojo::current()->RemoveHandler(handle_);
400 NotifyHandleReady(result);
401 }
402
403 Handle handle_;
404
405 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
406 };
407
408 // If the thread on which HandleWatcher is used runs a message pump different
409 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
410 // handle on the handle watcher thread.
411 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
412 public:
413 SecondaryThreadWatchingState(HandleWatcher* watcher,
414 const Handle& handle,
415 MojoHandleSignals handle_signals,
416 MojoDeadline deadline,
417 const base::Callback<void(MojoResult)>& callback)
418 : StateBase(watcher, callback),
419 weak_factory_(this) {
420 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
421 handle,
422 handle_signals,
423 MojoDeadlineToTimeTicks(deadline),
424 base::Bind(&StateBase::NotifyHandleReady, weak_factory_.GetWeakPtr()));
425 }
426
427 virtual ~SecondaryThreadWatchingState() {
428 // If we've been notified the handle is ready (|got_ready()| is true) then
429 // the watch has been implicitly removed by
430 // WatcherThreadManager/MessagePumpMojo and we don't have to call
431 // StopWatching(). To do so would needlessly entail posting a task and
432 // blocking until the background thread services it.
433 if (!got_ready())
434 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
435 }
436
437 private:
438 WatcherID watcher_id_;
439
381 // Used to weakly bind |this| to the WatcherThreadManager. 440 // Used to weakly bind |this| to the WatcherThreadManager.
382 base::WeakPtrFactory<State> weak_factory_; 441 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
442
443 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
383 }; 444 };
384 445
385 // HandleWatcher --------------------------------------------------------------- 446 // HandleWatcher ---------------------------------------------------------------
386 447
387 HandleWatcher::HandleWatcher() { 448 HandleWatcher::HandleWatcher() {
388 } 449 }
389 450
390 HandleWatcher::~HandleWatcher() { 451 HandleWatcher::~HandleWatcher() {
391 } 452 }
392 453
393 void HandleWatcher::Start(const Handle& handle, 454 void HandleWatcher::Start(const Handle& handle,
394 MojoHandleSignals handle_signals, 455 MojoHandleSignals handle_signals,
395 MojoDeadline deadline, 456 MojoDeadline deadline,
396 const base::Callback<void(MojoResult)>& callback) { 457 const base::Callback<void(MojoResult)>& callback) {
397 DCHECK(handle.is_valid()); 458 DCHECK(handle.is_valid());
398 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); 459 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
399 460
400 state_.reset(new State(this, handle, handle_signals, deadline, callback)); 461 if (MessagePumpMojo::IsCurrent()) {
462 state_.reset(new SameThreadWatchingState(
463 this, handle, handle_signals, deadline, callback));
464 } else {
465 state_.reset(new SecondaryThreadWatchingState(
466 this, handle, handle_signals, deadline, callback));
467 }
401 } 468 }
402 469
403 void HandleWatcher::Stop() { 470 void HandleWatcher::Stop() {
404 state_.reset(); 471 state_.reset();
405 } 472 }
406 473
407 } // namespace common 474 } // namespace common
408 } // namespace mojo 475 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/common/handle_watcher.h ('k') | mojo/common/message_pump_mojo.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698