OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/common/handle_watcher.h" | |
6 | |
7 #include <map> | |
8 | |
9 #include "base/atomic_sequence_num.h" | |
10 #include "base/bind.h" | |
11 #include "base/lazy_instance.h" | |
12 #include "base/logging.h" | |
13 #include "base/macros.h" | |
14 #include "base/memory/singleton.h" | |
15 #include "base/memory/weak_ptr.h" | |
16 #include "base/message_loop/message_loop.h" | |
17 #include "base/single_thread_task_runner.h" | |
18 #include "base/synchronization/lock.h" | |
19 #include "base/synchronization/waitable_event.h" | |
20 #include "base/thread_task_runner_handle.h" | |
21 #include "base/threading/thread.h" | |
22 #include "base/threading/thread_restrictions.h" | |
23 #include "base/time/time.h" | |
24 #include "mojo/common/message_pump_mojo.h" | |
25 #include "mojo/common/message_pump_mojo_handler.h" | |
26 #include "mojo/common/time_helper.h" | |
27 | |
28 namespace mojo { | |
29 namespace common { | |
30 | |
31 typedef int WatcherID; | |
32 | |
33 namespace { | |
34 | |
35 const char kWatcherThreadName[] = "handle-watcher-thread"; | |
36 | |
37 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { | |
38 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : | |
39 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); | |
40 } | |
41 | |
42 // Tracks the data for a single call to Start(). | |
43 struct WatchData { | |
44 WatchData() | |
45 : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {} | |
46 | |
47 WatcherID id; | |
48 Handle handle; | |
49 MojoHandleSignals handle_signals; | |
50 base::TimeTicks deadline; | |
51 base::Callback<void(MojoResult)> callback; | |
52 scoped_refptr<base::SingleThreadTaskRunner> task_runner; | |
53 }; | |
54 | |
55 // WatcherBackend -------------------------------------------------------------- | |
56 | |
57 // WatcherBackend is responsible for managing the requests and interacting with | |
58 // MessagePumpMojo. All access (outside of creation/destruction) is done on the | |
59 // thread WatcherThreadManager creates. | |
60 class WatcherBackend : public MessagePumpMojoHandler { | |
61 public: | |
62 WatcherBackend(); | |
63 ~WatcherBackend() override; | |
64 | |
65 void StartWatching(const WatchData& data); | |
66 | |
67 // Cancels a previously scheduled request to start a watch. | |
68 void StopWatching(WatcherID watcher_id); | |
69 | |
70 private: | |
71 typedef std::map<Handle, WatchData> HandleToWatchDataMap; | |
72 | |
73 // Invoked when a handle needs to be removed and notified. | |
74 void RemoveAndNotify(const Handle& handle, MojoResult result); | |
75 | |
76 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found | |
77 // and sets |handle| to the Handle. Returns false if not a known id. | |
78 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; | |
79 | |
80 // MessagePumpMojoHandler overrides: | |
81 void OnHandleReady(const Handle& handle) override; | |
82 void OnHandleError(const Handle& handle, MojoResult result) override; | |
83 | |
84 // Maps from assigned id to WatchData. | |
85 HandleToWatchDataMap handle_to_data_; | |
86 | |
87 DISALLOW_COPY_AND_ASSIGN(WatcherBackend); | |
88 }; | |
89 | |
90 WatcherBackend::WatcherBackend() { | |
91 } | |
92 | |
93 WatcherBackend::~WatcherBackend() { | |
94 } | |
95 | |
96 void WatcherBackend::StartWatching(const WatchData& data) { | |
97 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); | |
98 | |
99 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); | |
100 | |
101 handle_to_data_[data.handle] = data; | |
102 MessagePumpMojo::current()->AddHandler(this, data.handle, | |
103 data.handle_signals, | |
104 data.deadline); | |
105 } | |
106 | |
107 void WatcherBackend::StopWatching(WatcherID watcher_id) { | |
108 // Because of the thread hop it is entirely possible to get here and not | |
109 // have a valid handle registered for |watcher_id|. | |
110 Handle handle; | |
111 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { | |
112 handle_to_data_.erase(handle); | |
113 MessagePumpMojo::current()->RemoveHandler(handle); | |
114 } | |
115 } | |
116 | |
117 void WatcherBackend::RemoveAndNotify(const Handle& handle, | |
118 MojoResult result) { | |
119 if (handle_to_data_.count(handle) == 0) | |
120 return; | |
121 | |
122 const WatchData data(handle_to_data_[handle]); | |
123 handle_to_data_.erase(handle); | |
124 MessagePumpMojo::current()->RemoveHandler(handle); | |
125 | |
126 data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result)); | |
127 } | |
128 | |
129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, | |
130 Handle* handle) const { | |
131 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); | |
132 i != handle_to_data_.end(); ++i) { | |
133 if (i->second.id == watcher_id) { | |
134 *handle = i->second.handle; | |
135 return true; | |
136 } | |
137 } | |
138 return false; | |
139 } | |
140 | |
141 void WatcherBackend::OnHandleReady(const Handle& handle) { | |
142 RemoveAndNotify(handle, MOJO_RESULT_OK); | |
143 } | |
144 | |
145 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { | |
146 RemoveAndNotify(handle, result); | |
147 } | |
148 | |
149 // WatcherThreadManager -------------------------------------------------------- | |
150 | |
151 // WatcherThreadManager manages the background thread that listens for handles | |
152 // to be ready. All requests are handled by WatcherBackend. | |
153 } // namespace | |
154 | |
155 class WatcherThreadManager { | |
156 public: | |
157 ~WatcherThreadManager(); | |
158 | |
159 // Returns the shared instance. | |
160 static WatcherThreadManager* GetInstance(); | |
161 | |
162 // Starts watching the requested handle. Returns a unique ID that is used to | |
163 // stop watching the handle. When the handle is ready |callback| is notified | |
164 // on the thread StartWatching() was invoked on. | |
165 // This may be invoked on any thread. | |
166 WatcherID StartWatching(const Handle& handle, | |
167 MojoHandleSignals handle_signals, | |
168 base::TimeTicks deadline, | |
169 const base::Callback<void(MojoResult)>& callback); | |
170 | |
171 // Stops watching a handle. | |
172 // This may be invoked on any thread. | |
173 void StopWatching(WatcherID watcher_id); | |
174 | |
175 private: | |
176 enum RequestType { | |
177 REQUEST_START, | |
178 REQUEST_STOP, | |
179 }; | |
180 | |
181 // See description of |requests_| for details. | |
182 struct RequestData { | |
183 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} | |
184 | |
185 RequestType type; | |
186 WatchData start_data; | |
187 WatcherID stop_id; | |
188 base::WaitableEvent* stop_event; | |
189 }; | |
190 | |
191 typedef std::vector<RequestData> Requests; | |
192 | |
193 friend struct DefaultSingletonTraits<WatcherThreadManager>; | |
194 | |
195 WatcherThreadManager(); | |
196 | |
197 // Schedules a request on the background thread. See |requests_| for details. | |
198 void AddRequest(const RequestData& data); | |
199 | |
200 // Processes requests added to |requests_|. This is invoked on the backend | |
201 // thread. | |
202 void ProcessRequestsOnBackendThread(); | |
203 | |
204 base::Thread thread_; | |
205 | |
206 base::AtomicSequenceNumber watcher_id_generator_; | |
207 | |
208 WatcherBackend backend_; | |
209 | |
210 // Protects |requests_|. | |
211 base::Lock lock_; | |
212 | |
213 // Start/Stop result in adding a RequestData to |requests_| (protected by | |
214 // |lock_|). When the background thread wakes up it processes the requests. | |
215 Requests requests_; | |
216 | |
217 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); | |
218 }; | |
219 | |
220 WatcherThreadManager::~WatcherThreadManager() { | |
221 thread_.Stop(); | |
222 } | |
223 | |
224 WatcherThreadManager* WatcherThreadManager::GetInstance() { | |
225 return Singleton<WatcherThreadManager>::get(); | |
226 } | |
227 | |
228 WatcherID WatcherThreadManager::StartWatching( | |
229 const Handle& handle, | |
230 MojoHandleSignals handle_signals, | |
231 base::TimeTicks deadline, | |
232 const base::Callback<void(MojoResult)>& callback) { | |
233 RequestData request_data; | |
234 request_data.type = REQUEST_START; | |
235 request_data.start_data.id = watcher_id_generator_.GetNext(); | |
236 request_data.start_data.handle = handle; | |
237 request_data.start_data.callback = callback; | |
238 request_data.start_data.handle_signals = handle_signals; | |
239 request_data.start_data.deadline = deadline; | |
240 request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get(); | |
241 AddRequest(request_data); | |
242 return request_data.start_data.id; | |
243 } | |
244 | |
245 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { | |
246 // Handle the case of StartWatching() followed by StopWatching() before | |
247 // |thread_| woke up. | |
248 { | |
249 base::AutoLock auto_lock(lock_); | |
250 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { | |
251 if (i->type == REQUEST_START && i->start_data.id == watcher_id) { | |
252 // Watcher ids are not reused, so if we find it we can stop. | |
253 requests_.erase(i); | |
254 return; | |
255 } | |
256 } | |
257 } | |
258 | |
259 base::ThreadRestrictions::ScopedAllowWait allow_wait; | |
260 base::WaitableEvent event(true, false); | |
261 RequestData request_data; | |
262 request_data.type = REQUEST_STOP; | |
263 request_data.stop_id = watcher_id; | |
264 request_data.stop_event = &event; | |
265 AddRequest(request_data); | |
266 | |
267 // We need to block until the handle is actually removed. | |
268 event.Wait(); | |
269 } | |
270 | |
271 void WatcherThreadManager::AddRequest(const RequestData& data) { | |
272 { | |
273 base::AutoLock auto_lock(lock_); | |
274 const bool was_empty = requests_.empty(); | |
275 requests_.push_back(data); | |
276 if (!was_empty) | |
277 return; | |
278 } | |
279 // We own |thread_|, so it's safe to use Unretained() here. | |
280 thread_.task_runner()->PostTask( | |
281 FROM_HERE, | |
282 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, | |
283 base::Unretained(this))); | |
284 } | |
285 | |
286 void WatcherThreadManager::ProcessRequestsOnBackendThread() { | |
287 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); | |
288 | |
289 Requests requests; | |
290 { | |
291 base::AutoLock auto_lock(lock_); | |
292 requests_.swap(requests); | |
293 } | |
294 for (size_t i = 0; i < requests.size(); ++i) { | |
295 if (requests[i].type == REQUEST_START) { | |
296 backend_.StartWatching(requests[i].start_data); | |
297 } else { | |
298 backend_.StopWatching(requests[i].stop_id); | |
299 requests[i].stop_event->Signal(); | |
300 } | |
301 } | |
302 } | |
303 | |
304 WatcherThreadManager::WatcherThreadManager() | |
305 : thread_(kWatcherThreadName) { | |
306 base::Thread::Options thread_options; | |
307 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); | |
308 thread_.StartWithOptions(thread_options); | |
309 } | |
310 | |
311 // HandleWatcher::StateBase and subclasses ------------------------------------- | |
312 | |
313 // The base class of HandleWatcher's state. Owns the user's callback and | |
314 // monitors the current thread's MessageLoop to know when to force the callback | |
315 // to run (with an error) even though the pipe hasn't been signaled yet. | |
316 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { | |
317 public: | |
318 StateBase(HandleWatcher* watcher, | |
319 const base::Callback<void(MojoResult)>& callback) | |
320 : watcher_(watcher), | |
321 callback_(callback), | |
322 got_ready_(false) { | |
323 base::MessageLoop::current()->AddDestructionObserver(this); | |
324 } | |
325 | |
326 ~StateBase() override { | |
327 base::MessageLoop::current()->RemoveDestructionObserver(this); | |
328 } | |
329 | |
330 protected: | |
331 void NotifyHandleReady(MojoResult result) { | |
332 got_ready_ = true; | |
333 NotifyAndDestroy(result); | |
334 } | |
335 | |
336 bool got_ready() const { return got_ready_; } | |
337 | |
338 private: | |
339 void WillDestroyCurrentMessageLoop() override { | |
340 // The current thread is exiting. Simulate a watch error. | |
341 NotifyAndDestroy(MOJO_RESULT_ABORTED); | |
342 } | |
343 | |
344 void NotifyAndDestroy(MojoResult result) { | |
345 base::Callback<void(MojoResult)> callback = callback_; | |
346 watcher_->Stop(); // Destroys |this|. | |
347 | |
348 callback.Run(result); | |
349 } | |
350 | |
351 HandleWatcher* watcher_; | |
352 base::Callback<void(MojoResult)> callback_; | |
353 | |
354 // Have we been notified that the handle is ready? | |
355 bool got_ready_; | |
356 | |
357 DISALLOW_COPY_AND_ASSIGN(StateBase); | |
358 }; | |
359 | |
360 // If the thread on which HandleWatcher is used runs MessagePumpMojo, | |
361 // SameThreadWatchingState is used to directly watch the handle on the same | |
362 // thread. | |
363 class HandleWatcher::SameThreadWatchingState : public StateBase, | |
364 public MessagePumpMojoHandler { | |
365 public: | |
366 SameThreadWatchingState(HandleWatcher* watcher, | |
367 const Handle& handle, | |
368 MojoHandleSignals handle_signals, | |
369 MojoDeadline deadline, | |
370 const base::Callback<void(MojoResult)>& callback) | |
371 : StateBase(watcher, callback), | |
372 handle_(handle) { | |
373 DCHECK(MessagePumpMojo::IsCurrent()); | |
374 | |
375 MessagePumpMojo::current()->AddHandler( | |
376 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); | |
377 } | |
378 | |
379 ~SameThreadWatchingState() override { | |
380 if (!got_ready()) | |
381 MessagePumpMojo::current()->RemoveHandler(handle_); | |
382 } | |
383 | |
384 private: | |
385 // MessagePumpMojoHandler overrides: | |
386 void OnHandleReady(const Handle& handle) override { | |
387 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK); | |
388 } | |
389 | |
390 void OnHandleError(const Handle& handle, MojoResult result) override { | |
391 StopWatchingAndNotifyReady(handle, result); | |
392 } | |
393 | |
394 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) { | |
395 DCHECK_EQ(handle.value(), handle_.value()); | |
396 MessagePumpMojo::current()->RemoveHandler(handle_); | |
397 NotifyHandleReady(result); | |
398 } | |
399 | |
400 Handle handle_; | |
401 | |
402 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState); | |
403 }; | |
404 | |
405 // If the thread on which HandleWatcher is used runs a message pump different | |
406 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the | |
407 // handle on the handle watcher thread. | |
408 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { | |
409 public: | |
410 SecondaryThreadWatchingState(HandleWatcher* watcher, | |
411 const Handle& handle, | |
412 MojoHandleSignals handle_signals, | |
413 MojoDeadline deadline, | |
414 const base::Callback<void(MojoResult)>& callback) | |
415 : StateBase(watcher, callback), | |
416 weak_factory_(this) { | |
417 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( | |
418 handle, | |
419 handle_signals, | |
420 MojoDeadlineToTimeTicks(deadline), | |
421 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, | |
422 weak_factory_.GetWeakPtr())); | |
423 } | |
424 | |
425 ~SecondaryThreadWatchingState() override { | |
426 // If we've been notified the handle is ready (|got_ready()| is true) then | |
427 // the watch has been implicitly removed by | |
428 // WatcherThreadManager/MessagePumpMojo and we don't have to call | |
429 // StopWatching(). To do so would needlessly entail posting a task and | |
430 // blocking until the background thread services it. | |
431 if (!got_ready()) | |
432 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); | |
433 } | |
434 | |
435 private: | |
436 WatcherID watcher_id_; | |
437 | |
438 // Used to weakly bind |this| to the WatcherThreadManager. | |
439 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; | |
440 | |
441 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); | |
442 }; | |
443 | |
444 // HandleWatcher --------------------------------------------------------------- | |
445 | |
446 HandleWatcher::HandleWatcher() { | |
447 } | |
448 | |
449 HandleWatcher::~HandleWatcher() { | |
450 } | |
451 | |
452 void HandleWatcher::Start(const Handle& handle, | |
453 MojoHandleSignals handle_signals, | |
454 MojoDeadline deadline, | |
455 const base::Callback<void(MojoResult)>& callback) { | |
456 DCHECK(handle.is_valid()); | |
457 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); | |
458 | |
459 // Need to clear the state before creating a new one. | |
460 state_.reset(); | |
461 if (MessagePumpMojo::IsCurrent()) { | |
462 state_.reset(new SameThreadWatchingState( | |
463 this, handle, handle_signals, deadline, callback)); | |
464 } else { | |
465 state_.reset(new SecondaryThreadWatchingState( | |
466 this, handle, handle_signals, deadline, callback)); | |
467 } | |
468 } | |
469 | |
470 void HandleWatcher::Stop() { | |
471 state_.reset(); | |
472 } | |
473 | |
474 } // namespace common | |
475 } // namespace mojo | |
OLD | NEW |