Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(30)

Side by Side Diff: mojo/common/handle_watcher.cc

Issue 1841863002: Update monet. (Closed) Base URL: https://github.com/domokit/monet.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/common/handle_watcher.h ('k') | mojo/common/handle_watcher_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/common/handle_watcher.h"
6
7 #include <map>
8
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/memory/singleton.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/message_loop/message_loop_proxy.h"
18 #include "base/synchronization/lock.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/threading/thread.h"
21 #include "base/threading/thread_restrictions.h"
22 #include "base/time/time.h"
23 #include "mojo/common/message_pump_mojo.h"
24 #include "mojo/common/message_pump_mojo_handler.h"
25 #include "mojo/common/time_helper.h"
26
27 namespace mojo {
28 namespace common {
29
30 typedef int WatcherID;
31
32 namespace {
33
34 const char kWatcherThreadName[] = "handle-watcher-thread";
35
36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
37 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
38 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
39 }
40
41 // Tracks the data for a single call to Start().
42 struct WatchData {
43 WatchData()
44 : id(0),
45 handle_signals(MOJO_HANDLE_SIGNAL_NONE),
46 message_loop(NULL) {}
47
48 WatcherID id;
49 Handle handle;
50 MojoHandleSignals handle_signals;
51 base::TimeTicks deadline;
52 base::Callback<void(MojoResult)> callback;
53 scoped_refptr<base::MessageLoopProxy> message_loop;
54 };
55
56 // WatcherBackend --------------------------------------------------------------
57
58 // WatcherBackend is responsible for managing the requests and interacting with
59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
60 // thread WatcherThreadManager creates.
61 class WatcherBackend : public MessagePumpMojoHandler {
62 public:
63 WatcherBackend();
64 ~WatcherBackend() override;
65
66 void StartWatching(const WatchData& data);
67
68 // Cancels a previously scheduled request to start a watch.
69 void StopWatching(WatcherID watcher_id);
70
71 private:
72 typedef std::map<Handle, WatchData> HandleToWatchDataMap;
73
74 // Invoked when a handle needs to be removed and notified.
75 void RemoveAndNotify(const Handle& handle, MojoResult result);
76
77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
78 // and sets |handle| to the Handle. Returns false if not a known id.
79 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
80
81 // MessagePumpMojoHandler overrides:
82 void OnHandleReady(const Handle& handle) override;
83 void OnHandleError(const Handle& handle, MojoResult result) override;
84
85 // Maps from assigned id to WatchData.
86 HandleToWatchDataMap handle_to_data_;
87
88 DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
89 };
90
91 WatcherBackend::WatcherBackend() {
92 }
93
94 WatcherBackend::~WatcherBackend() {
95 }
96
97 void WatcherBackend::StartWatching(const WatchData& data) {
98 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
99
100 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
101
102 handle_to_data_[data.handle] = data;
103 MessagePumpMojo::current()->AddHandler(this, data.handle,
104 data.handle_signals,
105 data.deadline);
106 }
107
108 void WatcherBackend::StopWatching(WatcherID watcher_id) {
109 // Because of the thread hop it is entirely possible to get here and not
110 // have a valid handle registered for |watcher_id|.
111 Handle handle;
112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
113 handle_to_data_.erase(handle);
114 MessagePumpMojo::current()->RemoveHandler(handle);
115 }
116 }
117
118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
119 MojoResult result) {
120 if (handle_to_data_.count(handle) == 0)
121 return;
122
123 const WatchData data(handle_to_data_[handle]);
124 handle_to_data_.erase(handle);
125 MessagePumpMojo::current()->RemoveHandler(handle);
126
127 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
128 }
129
130 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
131 Handle* handle) const {
132 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
133 i != handle_to_data_.end(); ++i) {
134 if (i->second.id == watcher_id) {
135 *handle = i->second.handle;
136 return true;
137 }
138 }
139 return false;
140 }
141
142 void WatcherBackend::OnHandleReady(const Handle& handle) {
143 RemoveAndNotify(handle, MOJO_RESULT_OK);
144 }
145
146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
147 RemoveAndNotify(handle, result);
148 }
149
150 // WatcherThreadManager --------------------------------------------------------
151
152 // WatcherThreadManager manages the background thread that listens for handles
153 // to be ready. All requests are handled by WatcherBackend.
154 } // namespace
155
156 class WatcherThreadManager {
157 public:
158 ~WatcherThreadManager();
159
160 // Returns the shared instance.
161 static WatcherThreadManager* GetInstance();
162
163 // Starts watching the requested handle. Returns a unique ID that is used to
164 // stop watching the handle. When the handle is ready |callback| is notified
165 // on the thread StartWatching() was invoked on.
166 // This may be invoked on any thread.
167 WatcherID StartWatching(const Handle& handle,
168 MojoHandleSignals handle_signals,
169 base::TimeTicks deadline,
170 const base::Callback<void(MojoResult)>& callback);
171
172 // Stops watching a handle.
173 // This may be invoked on any thread.
174 void StopWatching(WatcherID watcher_id);
175
176 private:
177 enum RequestType {
178 REQUEST_START,
179 REQUEST_STOP,
180 };
181
182 // See description of |requests_| for details.
183 struct RequestData {
184 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
185
186 RequestType type;
187 WatchData start_data;
188 WatcherID stop_id;
189 base::WaitableEvent* stop_event;
190 };
191
192 typedef std::vector<RequestData> Requests;
193
194 friend struct DefaultSingletonTraits<WatcherThreadManager>;
195
196 WatcherThreadManager();
197
198 // Schedules a request on the background thread. See |requests_| for details.
199 void AddRequest(const RequestData& data);
200
201 // Processes requests added to |requests_|. This is invoked on the backend
202 // thread.
203 void ProcessRequestsOnBackendThread();
204
205 base::Thread thread_;
206
207 base::AtomicSequenceNumber watcher_id_generator_;
208
209 WatcherBackend backend_;
210
211 // Protects |requests_|.
212 base::Lock lock_;
213
214 // Start/Stop result in adding a RequestData to |requests_| (protected by
215 // |lock_|). When the background thread wakes up it processes the requests.
216 Requests requests_;
217
218 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
219 };
220
221 WatcherThreadManager::~WatcherThreadManager() {
222 thread_.Stop();
223 }
224
225 WatcherThreadManager* WatcherThreadManager::GetInstance() {
226 return Singleton<WatcherThreadManager>::get();
227 }
228
229 WatcherID WatcherThreadManager::StartWatching(
230 const Handle& handle,
231 MojoHandleSignals handle_signals,
232 base::TimeTicks deadline,
233 const base::Callback<void(MojoResult)>& callback) {
234 RequestData request_data;
235 request_data.type = REQUEST_START;
236 request_data.start_data.id = watcher_id_generator_.GetNext();
237 request_data.start_data.handle = handle;
238 request_data.start_data.callback = callback;
239 request_data.start_data.handle_signals = handle_signals;
240 request_data.start_data.deadline = deadline;
241 request_data.start_data.message_loop = base::MessageLoopProxy::current();
242 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
243 request_data.start_data.message_loop.get());
244 AddRequest(request_data);
245 return request_data.start_data.id;
246 }
247
248 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
249 // Handle the case of StartWatching() followed by StopWatching() before
250 // |thread_| woke up.
251 {
252 base::AutoLock auto_lock(lock_);
253 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
254 if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
255 // Watcher ids are not reused, so if we find it we can stop.
256 requests_.erase(i);
257 return;
258 }
259 }
260 }
261
262 base::ThreadRestrictions::ScopedAllowWait allow_wait;
263 base::WaitableEvent event(true, false);
264 RequestData request_data;
265 request_data.type = REQUEST_STOP;
266 request_data.stop_id = watcher_id;
267 request_data.stop_event = &event;
268 AddRequest(request_data);
269
270 // We need to block until the handle is actually removed.
271 event.Wait();
272 }
273
274 void WatcherThreadManager::AddRequest(const RequestData& data) {
275 {
276 base::AutoLock auto_lock(lock_);
277 const bool was_empty = requests_.empty();
278 requests_.push_back(data);
279 if (!was_empty)
280 return;
281 }
282 // We own |thread_|, so it's safe to use Unretained() here.
283 thread_.message_loop()->PostTask(
284 FROM_HERE,
285 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
286 base::Unretained(this)));
287 }
288
289 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
290 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
291
292 Requests requests;
293 {
294 base::AutoLock auto_lock(lock_);
295 requests_.swap(requests);
296 }
297 for (size_t i = 0; i < requests.size(); ++i) {
298 if (requests[i].type == REQUEST_START) {
299 backend_.StartWatching(requests[i].start_data);
300 } else {
301 backend_.StopWatching(requests[i].stop_id);
302 requests[i].stop_event->Signal();
303 }
304 }
305 }
306
307 WatcherThreadManager::WatcherThreadManager()
308 : thread_(kWatcherThreadName) {
309 base::Thread::Options thread_options;
310 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
311 thread_.StartWithOptions(thread_options);
312 }
313
314 // HandleWatcher::StateBase and subclasses -------------------------------------
315
316 // The base class of HandleWatcher's state. Owns the user's callback and
317 // monitors the current thread's MessageLoop to know when to force the callback
318 // to run (with an error) even though the pipe hasn't been signaled yet.
319 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
320 public:
321 StateBase(HandleWatcher* watcher,
322 const base::Callback<void(MojoResult)>& callback)
323 : watcher_(watcher),
324 callback_(callback),
325 got_ready_(false) {
326 base::MessageLoop::current()->AddDestructionObserver(this);
327 }
328
329 ~StateBase() override {
330 base::MessageLoop::current()->RemoveDestructionObserver(this);
331 }
332
333 protected:
334 void NotifyHandleReady(MojoResult result) {
335 got_ready_ = true;
336 NotifyAndDestroy(result);
337 }
338
339 bool got_ready() const { return got_ready_; }
340
341 private:
342 void WillDestroyCurrentMessageLoop() override {
343 // The current thread is exiting. Simulate a watch error.
344 NotifyAndDestroy(MOJO_RESULT_ABORTED);
345 }
346
347 void NotifyAndDestroy(MojoResult result) {
348 base::Callback<void(MojoResult)> callback = callback_;
349 watcher_->Stop(); // Destroys |this|.
350
351 callback.Run(result);
352 }
353
354 HandleWatcher* watcher_;
355 base::Callback<void(MojoResult)> callback_;
356
357 // Have we been notified that the handle is ready?
358 bool got_ready_;
359
360 DISALLOW_COPY_AND_ASSIGN(StateBase);
361 };
362
363 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
364 // SameThreadWatchingState is used to directly watch the handle on the same
365 // thread.
366 class HandleWatcher::SameThreadWatchingState : public StateBase,
367 public MessagePumpMojoHandler {
368 public:
369 SameThreadWatchingState(HandleWatcher* watcher,
370 const Handle& handle,
371 MojoHandleSignals handle_signals,
372 MojoDeadline deadline,
373 const base::Callback<void(MojoResult)>& callback)
374 : StateBase(watcher, callback),
375 handle_(handle) {
376 DCHECK(MessagePumpMojo::IsCurrent());
377
378 MessagePumpMojo::current()->AddHandler(
379 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
380 }
381
382 ~SameThreadWatchingState() override {
383 if (!got_ready())
384 MessagePumpMojo::current()->RemoveHandler(handle_);
385 }
386
387 private:
388 // MessagePumpMojoHandler overrides:
389 void OnHandleReady(const Handle& handle) override {
390 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
391 }
392
393 void OnHandleError(const Handle& handle, MojoResult result) override {
394 StopWatchingAndNotifyReady(handle, result);
395 }
396
397 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
398 DCHECK_EQ(handle.value(), handle_.value());
399 MessagePumpMojo::current()->RemoveHandler(handle_);
400 NotifyHandleReady(result);
401 }
402
403 Handle handle_;
404
405 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
406 };
407
408 // If the thread on which HandleWatcher is used runs a message pump different
409 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
410 // handle on the handle watcher thread.
411 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
412 public:
413 SecondaryThreadWatchingState(HandleWatcher* watcher,
414 const Handle& handle,
415 MojoHandleSignals handle_signals,
416 MojoDeadline deadline,
417 const base::Callback<void(MojoResult)>& callback)
418 : StateBase(watcher, callback),
419 weak_factory_(this) {
420 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
421 handle,
422 handle_signals,
423 MojoDeadlineToTimeTicks(deadline),
424 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
425 weak_factory_.GetWeakPtr()));
426 }
427
428 ~SecondaryThreadWatchingState() override {
429 // If we've been notified the handle is ready (|got_ready()| is true) then
430 // the watch has been implicitly removed by
431 // WatcherThreadManager/MessagePumpMojo and we don't have to call
432 // StopWatching(). To do so would needlessly entail posting a task and
433 // blocking until the background thread services it.
434 if (!got_ready())
435 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
436 }
437
438 private:
439 WatcherID watcher_id_;
440
441 // Used to weakly bind |this| to the WatcherThreadManager.
442 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
443
444 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
445 };
446
447 // HandleWatcher ---------------------------------------------------------------
448
449 HandleWatcher::HandleWatcher() {
450 }
451
452 HandleWatcher::~HandleWatcher() {
453 }
454
455 void HandleWatcher::Start(const Handle& handle,
456 MojoHandleSignals handle_signals,
457 MojoDeadline deadline,
458 const base::Callback<void(MojoResult)>& callback) {
459 DCHECK(handle.is_valid());
460 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
461
462 // Need to clear the state before creating a new one.
463 state_.reset();
464 if (MessagePumpMojo::IsCurrent()) {
465 state_.reset(new SameThreadWatchingState(
466 this, handle, handle_signals, deadline, callback));
467 } else {
468 state_.reset(new SecondaryThreadWatchingState(
469 this, handle, handle_signals, deadline, callback));
470 }
471 }
472
473 void HandleWatcher::Stop() {
474 state_.reset();
475 }
476
477 } // namespace common
478 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/common/handle_watcher.h ('k') | mojo/common/handle_watcher_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698