OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/message_pump/handle_watcher.h" | 5 #include "mojo/message_pump/handle_watcher.h" |
6 | 6 |
7 #include <map> | 7 #include <map> |
8 | 8 |
9 #include "base/atomic_sequence_num.h" | 9 #include "base/atomic_sequence_num.h" |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 24 matching lines...) Expand all Loading... |
35 const char kWatcherThreadName[] = "handle-watcher-thread"; | 35 const char kWatcherThreadName[] = "handle-watcher-thread"; |
36 | 36 |
37 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { | 37 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { |
38 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : | 38 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : |
39 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); | 39 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); |
40 } | 40 } |
41 | 41 |
42 // Tracks the data for a single call to Start(). | 42 // Tracks the data for a single call to Start(). |
43 struct WatchData { | 43 struct WatchData { |
44 WatchData() | 44 WatchData() |
45 : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {} | 45 : location(0), |
| 46 id(0), |
| 47 handle_signals(MOJO_HANDLE_SIGNAL_NONE), |
| 48 task_runner(NULL) {} |
46 | 49 |
| 50 int location; |
47 WatcherID id; | 51 WatcherID id; |
48 Handle handle; | 52 Handle handle; |
49 MojoHandleSignals handle_signals; | 53 MojoHandleSignals handle_signals; |
50 base::TimeTicks deadline; | 54 base::TimeTicks deadline; |
51 base::Callback<void(MojoResult)> callback; | 55 base::Callback<void(MojoResult)> callback; |
52 scoped_refptr<base::SingleThreadTaskRunner> task_runner; | 56 scoped_refptr<base::SingleThreadTaskRunner> task_runner; |
53 }; | 57 }; |
54 | 58 |
55 // WatcherBackend -------------------------------------------------------------- | 59 // WatcherBackend -------------------------------------------------------------- |
56 | 60 |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
92 | 96 |
93 WatcherBackend::~WatcherBackend() { | 97 WatcherBackend::~WatcherBackend() { |
94 } | 98 } |
95 | 99 |
96 void WatcherBackend::StartWatching(const WatchData& data) { | 100 void WatcherBackend::StartWatching(const WatchData& data) { |
97 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); | 101 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); |
98 | 102 |
99 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); | 103 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); |
100 | 104 |
101 handle_to_data_[data.handle] = data; | 105 handle_to_data_[data.handle] = data; |
102 MessagePumpMojo::current()->AddHandler(this, data.handle, | 106 MessagePumpMojo::current()->AddHandler(data.location, this, data.handle, |
103 data.handle_signals, | 107 data.handle_signals, data.deadline); |
104 data.deadline); | |
105 } | 108 } |
106 | 109 |
107 void WatcherBackend::StopWatching(WatcherID watcher_id) { | 110 void WatcherBackend::StopWatching(WatcherID watcher_id) { |
108 // Because of the thread hop it is entirely possible to get here and not | 111 // Because of the thread hop it is entirely possible to get here and not |
109 // have a valid handle registered for |watcher_id|. | 112 // have a valid handle registered for |watcher_id|. |
110 Handle handle; | 113 Handle handle; |
111 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { | 114 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { |
112 handle_to_data_.erase(handle); | 115 handle_to_data_.erase(handle); |
113 MessagePumpMojo::current()->RemoveHandler(handle); | 116 MessagePumpMojo::current()->RemoveHandler(handle); |
114 } | 117 } |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
156 public: | 159 public: |
157 ~WatcherThreadManager(); | 160 ~WatcherThreadManager(); |
158 | 161 |
159 // Returns the shared instance. | 162 // Returns the shared instance. |
160 static WatcherThreadManager* GetInstance(); | 163 static WatcherThreadManager* GetInstance(); |
161 | 164 |
162 // Starts watching the requested handle. Returns a unique ID that is used to | 165 // Starts watching the requested handle. Returns a unique ID that is used to |
163 // stop watching the handle. When the handle is ready |callback| is notified | 166 // stop watching the handle. When the handle is ready |callback| is notified |
164 // on the thread StartWatching() was invoked on. | 167 // on the thread StartWatching() was invoked on. |
165 // This may be invoked on any thread. | 168 // This may be invoked on any thread. |
166 WatcherID StartWatching(const Handle& handle, | 169 WatcherID StartWatching(int location, |
| 170 const Handle& handle, |
167 MojoHandleSignals handle_signals, | 171 MojoHandleSignals handle_signals, |
168 base::TimeTicks deadline, | 172 base::TimeTicks deadline, |
169 const base::Callback<void(MojoResult)>& callback); | 173 const base::Callback<void(MojoResult)>& callback); |
170 | 174 |
171 // Stops watching a handle. | 175 // Stops watching a handle. |
172 // This may be invoked on any thread. | 176 // This may be invoked on any thread. |
173 void StopWatching(WatcherID watcher_id); | 177 void StopWatching(WatcherID watcher_id); |
174 | 178 |
175 private: | 179 private: |
176 enum RequestType { | 180 enum RequestType { |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
219 | 223 |
220 WatcherThreadManager::~WatcherThreadManager() { | 224 WatcherThreadManager::~WatcherThreadManager() { |
221 thread_.Stop(); | 225 thread_.Stop(); |
222 } | 226 } |
223 | 227 |
224 WatcherThreadManager* WatcherThreadManager::GetInstance() { | 228 WatcherThreadManager* WatcherThreadManager::GetInstance() { |
225 return base::Singleton<WatcherThreadManager>::get(); | 229 return base::Singleton<WatcherThreadManager>::get(); |
226 } | 230 } |
227 | 231 |
228 WatcherID WatcherThreadManager::StartWatching( | 232 WatcherID WatcherThreadManager::StartWatching( |
| 233 int location, |
229 const Handle& handle, | 234 const Handle& handle, |
230 MojoHandleSignals handle_signals, | 235 MojoHandleSignals handle_signals, |
231 base::TimeTicks deadline, | 236 base::TimeTicks deadline, |
232 const base::Callback<void(MojoResult)>& callback) { | 237 const base::Callback<void(MojoResult)>& callback) { |
233 RequestData request_data; | 238 RequestData request_data; |
234 request_data.type = REQUEST_START; | 239 request_data.type = REQUEST_START; |
| 240 request_data.start_data.location = location; |
235 request_data.start_data.id = watcher_id_generator_.GetNext(); | 241 request_data.start_data.id = watcher_id_generator_.GetNext(); |
236 request_data.start_data.handle = handle; | 242 request_data.start_data.handle = handle; |
237 request_data.start_data.callback = callback; | 243 request_data.start_data.callback = callback; |
238 request_data.start_data.handle_signals = handle_signals; | 244 request_data.start_data.handle_signals = handle_signals; |
239 request_data.start_data.deadline = deadline; | 245 request_data.start_data.deadline = deadline; |
240 request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get(); | 246 request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get(); |
241 AddRequest(request_data); | 247 AddRequest(request_data); |
242 return request_data.start_data.id; | 248 return request_data.start_data.id; |
243 } | 249 } |
244 | 250 |
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
365 public: | 371 public: |
366 SameThreadWatchingState(HandleWatcher* watcher, | 372 SameThreadWatchingState(HandleWatcher* watcher, |
367 const Handle& handle, | 373 const Handle& handle, |
368 MojoHandleSignals handle_signals, | 374 MojoHandleSignals handle_signals, |
369 MojoDeadline deadline, | 375 MojoDeadline deadline, |
370 const base::Callback<void(MojoResult)>& callback) | 376 const base::Callback<void(MojoResult)>& callback) |
371 : StateBase(watcher, callback), | 377 : StateBase(watcher, callback), |
372 handle_(handle) { | 378 handle_(handle) { |
373 DCHECK(MessagePumpMojo::IsCurrent()); | 379 DCHECK(MessagePumpMojo::IsCurrent()); |
374 | 380 |
375 MessagePumpMojo::current()->AddHandler( | 381 MessagePumpMojo::current()->AddHandler(watcher->location(), this, handle, |
376 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); | 382 handle_signals, |
| 383 MojoDeadlineToTimeTicks(deadline)); |
377 } | 384 } |
378 | 385 |
379 ~SameThreadWatchingState() override { | 386 ~SameThreadWatchingState() override { |
380 if (!got_ready()) | 387 if (!got_ready()) |
381 MessagePumpMojo::current()->RemoveHandler(handle_); | 388 MessagePumpMojo::current()->RemoveHandler(handle_); |
382 } | 389 } |
383 | 390 |
384 private: | 391 private: |
385 // MessagePumpMojoHandler overrides: | 392 // MessagePumpMojoHandler overrides: |
386 void OnHandleReady(const Handle& handle) override { | 393 void OnHandleReady(const Handle& handle) override { |
(...skipping 21 matching lines...) Expand all Loading... |
408 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { | 415 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { |
409 public: | 416 public: |
410 SecondaryThreadWatchingState(HandleWatcher* watcher, | 417 SecondaryThreadWatchingState(HandleWatcher* watcher, |
411 const Handle& handle, | 418 const Handle& handle, |
412 MojoHandleSignals handle_signals, | 419 MojoHandleSignals handle_signals, |
413 MojoDeadline deadline, | 420 MojoDeadline deadline, |
414 const base::Callback<void(MojoResult)>& callback) | 421 const base::Callback<void(MojoResult)>& callback) |
415 : StateBase(watcher, callback), | 422 : StateBase(watcher, callback), |
416 weak_factory_(this) { | 423 weak_factory_(this) { |
417 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( | 424 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( |
418 handle, | 425 watcher->location(), handle, handle_signals, |
419 handle_signals, | |
420 MojoDeadlineToTimeTicks(deadline), | 426 MojoDeadlineToTimeTicks(deadline), |
421 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, | 427 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, |
422 weak_factory_.GetWeakPtr())); | 428 weak_factory_.GetWeakPtr())); |
423 } | 429 } |
424 | 430 |
425 ~SecondaryThreadWatchingState() override { | 431 ~SecondaryThreadWatchingState() override { |
426 // If we've been notified the handle is ready (|got_ready()| is true) then | 432 // If we've been notified the handle is ready (|got_ready()| is true) then |
427 // the watch has been implicitly removed by | 433 // the watch has been implicitly removed by |
428 // WatcherThreadManager/MessagePumpMojo and we don't have to call | 434 // WatcherThreadManager/MessagePumpMojo and we don't have to call |
429 // StopWatching(). To do so would needlessly entail posting a task and | 435 // StopWatching(). To do so would needlessly entail posting a task and |
430 // blocking until the background thread services it. | 436 // blocking until the background thread services it. |
431 if (!got_ready()) | 437 if (!got_ready()) |
432 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); | 438 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); |
433 } | 439 } |
434 | 440 |
435 private: | 441 private: |
436 WatcherID watcher_id_; | 442 WatcherID watcher_id_; |
437 | 443 |
438 // Used to weakly bind |this| to the WatcherThreadManager. | 444 // Used to weakly bind |this| to the WatcherThreadManager. |
439 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; | 445 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; |
440 | 446 |
441 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); | 447 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); |
442 }; | 448 }; |
443 | 449 |
444 // HandleWatcher --------------------------------------------------------------- | 450 // HandleWatcher --------------------------------------------------------------- |
445 | 451 |
446 HandleWatcher::HandleWatcher() { | 452 HandleWatcher::HandleWatcher(int location) : location_(location) {} |
447 } | |
448 | 453 |
449 HandleWatcher::~HandleWatcher() { | 454 HandleWatcher::~HandleWatcher() { |
450 } | 455 } |
451 | 456 |
452 void HandleWatcher::Start(const Handle& handle, | 457 void HandleWatcher::Start(const Handle& handle, |
453 MojoHandleSignals handle_signals, | 458 MojoHandleSignals handle_signals, |
454 MojoDeadline deadline, | 459 MojoDeadline deadline, |
455 const base::Callback<void(MojoResult)>& callback) { | 460 const base::Callback<void(MojoResult)>& callback) { |
456 DCHECK(handle.is_valid()); | 461 DCHECK(handle.is_valid()); |
457 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); | 462 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); |
458 | 463 |
459 // Need to clear the state before creating a new one. | 464 // Need to clear the state before creating a new one. |
460 state_.reset(); | 465 state_.reset(); |
461 if (MessagePumpMojo::IsCurrent()) { | 466 if (MessagePumpMojo::IsCurrent()) { |
462 state_.reset(new SameThreadWatchingState( | 467 state_.reset(new SameThreadWatchingState( |
463 this, handle, handle_signals, deadline, callback)); | 468 this, handle, handle_signals, deadline, callback)); |
464 } else { | 469 } else { |
465 state_.reset(new SecondaryThreadWatchingState( | 470 state_.reset(new SecondaryThreadWatchingState( |
466 this, handle, handle_signals, deadline, callback)); | 471 this, handle, handle_signals, deadline, callback)); |
467 } | 472 } |
468 } | 473 } |
469 | 474 |
470 void HandleWatcher::Stop() { | 475 void HandleWatcher::Stop() { |
471 state_.reset(); | 476 state_.reset(); |
472 } | 477 } |
473 | 478 |
474 } // namespace common | 479 } // namespace common |
475 } // namespace mojo | 480 } // namespace mojo |
OLD | NEW |