OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/message_pump/handle_watcher.h" | 5 #include "mojo/message_pump/handle_watcher.h" |
6 | 6 |
7 #include <map> | 7 #include <map> |
8 | 8 |
9 #include "base/atomic_sequence_num.h" | 9 #include "base/atomic_sequence_num.h" |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
11 #include "base/lazy_instance.h" | 11 #include "base/lazy_instance.h" |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/macros.h" | 13 #include "base/macros.h" |
14 #include "base/memory/singleton.h" | 14 #include "base/memory/singleton.h" |
15 #include "base/memory/weak_ptr.h" | 15 #include "base/memory/weak_ptr.h" |
16 #include "base/message_loop/message_loop.h" | 16 #include "base/message_loop/message_loop.h" |
17 #include "base/profiler/scoped_tracker.h" | 17 #include "base/profiler/scoped_tracker.h" |
18 #include "base/single_thread_task_runner.h" | 18 #include "base/single_thread_task_runner.h" |
19 #include "base/synchronization/lock.h" | 19 #include "base/synchronization/lock.h" |
20 #include "base/synchronization/waitable_event.h" | |
21 #include "base/thread_task_runner_handle.h" | 20 #include "base/thread_task_runner_handle.h" |
22 #include "base/threading/thread.h" | 21 #include "base/threading/thread.h" |
23 #include "base/threading/thread_restrictions.h" | |
24 #include "base/time/time.h" | 22 #include "base/time/time.h" |
25 #include "mojo/message_pump/message_pump_mojo.h" | 23 #include "mojo/message_pump/message_pump_mojo.h" |
26 #include "mojo/message_pump/message_pump_mojo_handler.h" | 24 #include "mojo/message_pump/message_pump_mojo_handler.h" |
27 #include "mojo/message_pump/time_helper.h" | 25 #include "mojo/message_pump/time_helper.h" |
28 | 26 |
29 namespace mojo { | 27 namespace mojo { |
30 namespace common { | 28 namespace common { |
31 | 29 |
32 typedef int WatcherID; | 30 typedef int WatcherID; |
33 | 31 |
(...skipping 23 matching lines...) Expand all Loading... |
57 | 55 |
58 // WatcherBackend is responsible for managing the requests and interacting with | 56 // WatcherBackend is responsible for managing the requests and interacting with |
59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the | 57 // MessagePumpMojo. All access (outside of creation/destruction) is done on the |
60 // thread WatcherThreadManager creates. | 58 // thread WatcherThreadManager creates. |
61 class WatcherBackend : public MessagePumpMojoHandler { | 59 class WatcherBackend : public MessagePumpMojoHandler { |
62 public: | 60 public: |
63 WatcherBackend(); | 61 WatcherBackend(); |
64 ~WatcherBackend() override; | 62 ~WatcherBackend() override; |
65 | 63 |
66 void StartWatching(const WatchData& data); | 64 void StartWatching(const WatchData& data); |
67 | |
68 // Cancels a previously scheduled request to start a watch. | |
69 void StopWatching(WatcherID watcher_id); | 65 void StopWatching(WatcherID watcher_id); |
70 | 66 |
71 private: | 67 private: |
72 typedef std::map<Handle, WatchData> HandleToWatchDataMap; | 68 typedef std::map<Handle, WatchData> HandleToWatchDataMap; |
73 | 69 |
74 // Invoked when a handle needs to be removed and notified. | 70 // Invoked when a handle needs to be removed and notified. |
75 void RemoveAndNotify(const Handle& handle, MojoResult result); | 71 void RemoveAndNotify(const Handle& handle, MojoResult result); |
76 | 72 |
77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found | 73 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found |
78 // and sets |handle| to the Handle. Returns false if not a known id. | 74 // and sets |handle| to the Handle. Returns false if not a known id. |
(...skipping 23 matching lines...) Expand all Loading... |
102 handle_to_data_[data.handle] = data; | 98 handle_to_data_[data.handle] = data; |
103 MessagePumpMojo::current()->AddHandler(this, data.handle, | 99 MessagePumpMojo::current()->AddHandler(this, data.handle, |
104 data.handle_signals, | 100 data.handle_signals, |
105 data.deadline); | 101 data.deadline); |
106 } | 102 } |
107 | 103 |
108 void WatcherBackend::StopWatching(WatcherID watcher_id) { | 104 void WatcherBackend::StopWatching(WatcherID watcher_id) { |
109 // Because of the thread hop it is entirely possible to get here and not | 105 // Because of the thread hop it is entirely possible to get here and not |
110 // have a valid handle registered for |watcher_id|. | 106 // have a valid handle registered for |watcher_id|. |
111 Handle handle; | 107 Handle handle; |
112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { | 108 if (!GetMojoHandleByWatcherID(watcher_id, &handle)) |
113 handle_to_data_.erase(handle); | 109 return; |
114 MessagePumpMojo::current()->RemoveHandler(handle); | 110 |
115 } | 111 handle_to_data_.erase(handle); |
| 112 MessagePumpMojo::current()->RemoveHandler(handle); |
116 } | 113 } |
117 | 114 |
118 void WatcherBackend::RemoveAndNotify(const Handle& handle, | 115 void WatcherBackend::RemoveAndNotify(const Handle& handle, |
119 MojoResult result) { | 116 MojoResult result) { |
120 if (handle_to_data_.count(handle) == 0) | 117 if (handle_to_data_.count(handle) == 0) |
121 return; | 118 return; |
122 | 119 |
123 const WatchData data(handle_to_data_[handle]); | 120 const WatchData data(handle_to_data_[handle]); |
124 handle_to_data_.erase(handle); | 121 handle_to_data_.erase(handle); |
125 MessagePumpMojo::current()->RemoveHandler(handle); | 122 MessagePumpMojo::current()->RemoveHandler(handle); |
(...skipping 18 matching lines...) Expand all Loading... |
144 } | 141 } |
145 | 142 |
146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { | 143 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { |
147 RemoveAndNotify(handle, result); | 144 RemoveAndNotify(handle, result); |
148 } | 145 } |
149 | 146 |
150 // WatcherThreadManager -------------------------------------------------------- | 147 // WatcherThreadManager -------------------------------------------------------- |
151 | 148 |
152 // WatcherThreadManager manages the background thread that listens for handles | 149 // WatcherThreadManager manages the background thread that listens for handles |
153 // to be ready. All requests are handled by WatcherBackend. | 150 // to be ready. All requests are handled by WatcherBackend. |
154 } // namespace | |
155 | |
156 class WatcherThreadManager { | 151 class WatcherThreadManager { |
157 public: | 152 public: |
158 ~WatcherThreadManager(); | 153 ~WatcherThreadManager(); |
159 | 154 |
160 // Returns the shared instance. | 155 // Returns the shared instance. |
161 static WatcherThreadManager* GetInstance(); | 156 static WatcherThreadManager* GetInstance(); |
162 | 157 |
163 // Starts watching the requested handle. Returns a unique ID that is used to | 158 // Starts watching the requested handle. Returns a unique ID that is used to |
164 // stop watching the handle. When the handle is ready |callback| is notified | 159 // stop watching the handle. When the handle is ready |callback| is notified |
165 // on the thread StartWatching() was invoked on. | 160 // on the thread StartWatching() was invoked on. |
166 // This may be invoked on any thread. | 161 // This may be invoked on any thread. |
167 WatcherID StartWatching(const Handle& handle, | 162 WatcherID StartWatching(const Handle& handle, |
168 MojoHandleSignals handle_signals, | 163 MojoHandleSignals handle_signals, |
169 base::TimeTicks deadline, | 164 base::TimeTicks deadline, |
170 const base::Callback<void(MojoResult)>& callback); | 165 const base::Callback<void(MojoResult)>& callback); |
171 | 166 |
172 // Stops watching a handle. | 167 // Stops watching a handle. |
173 // This may be invoked on any thread. | 168 // This may be invoked on any thread. |
174 void StopWatching(WatcherID watcher_id); | 169 void StopWatching(WatcherID watcher_id); |
175 | 170 |
176 private: | 171 private: |
177 enum RequestType { | 172 enum RequestType { |
178 REQUEST_START, | 173 REQUEST_START, |
179 REQUEST_STOP, | 174 REQUEST_STOP, |
180 }; | 175 }; |
181 | 176 |
182 // See description of |requests_| for details. | 177 // See description of |requests_| for details. |
183 struct RequestData { | 178 struct RequestData { |
184 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} | 179 RequestData() : type(REQUEST_START), stop_id(0) {} |
185 | 180 |
186 RequestType type; | 181 RequestType type; |
187 WatchData start_data; | 182 WatchData start_data; |
188 WatcherID stop_id; | 183 WatcherID stop_id; |
189 base::WaitableEvent* stop_event; | |
190 }; | 184 }; |
191 | 185 |
192 typedef std::vector<RequestData> Requests; | 186 typedef std::vector<RequestData> Requests; |
193 | 187 |
194 friend struct base::DefaultSingletonTraits<WatcherThreadManager>; | 188 friend struct base::DefaultSingletonTraits<WatcherThreadManager>; |
195 | 189 |
196 WatcherThreadManager(); | 190 WatcherThreadManager(); |
197 | 191 |
198 // Schedules a request on the background thread. See |requests_| for details. | 192 // Schedules a request on the background thread. See |requests_| for details. |
199 void AddRequest(const RequestData& data); | 193 void AddRequest(const RequestData& data); |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
255 return; | 249 return; |
256 } | 250 } |
257 } | 251 } |
258 } | 252 } |
259 | 253 |
260 // TODO(amistry): Remove ScopedTracker below once http://crbug.com/554761 is | 254 // TODO(amistry): Remove ScopedTracker below once http://crbug.com/554761 is |
261 // fixed. | 255 // fixed. |
262 tracked_objects::ScopedTracker tracking_profile( | 256 tracked_objects::ScopedTracker tracking_profile( |
263 FROM_HERE_WITH_EXPLICIT_FUNCTION( | 257 FROM_HERE_WITH_EXPLICIT_FUNCTION( |
264 "554761 WatcherThreadManager::StopWatching")); | 258 "554761 WatcherThreadManager::StopWatching")); |
265 base::ThreadRestrictions::ScopedAllowWait allow_wait; | |
266 base::WaitableEvent event(true, false); | |
267 RequestData request_data; | 259 RequestData request_data; |
268 request_data.type = REQUEST_STOP; | 260 request_data.type = REQUEST_STOP; |
269 request_data.stop_id = watcher_id; | 261 request_data.stop_id = watcher_id; |
270 request_data.stop_event = &event; | |
271 AddRequest(request_data); | 262 AddRequest(request_data); |
272 | |
273 // We need to block until the handle is actually removed. | |
274 event.Wait(); | |
275 } | 263 } |
276 | 264 |
277 void WatcherThreadManager::AddRequest(const RequestData& data) { | 265 void WatcherThreadManager::AddRequest(const RequestData& data) { |
278 { | 266 { |
279 base::AutoLock auto_lock(lock_); | 267 base::AutoLock auto_lock(lock_); |
280 const bool was_empty = requests_.empty(); | 268 const bool was_empty = requests_.empty(); |
281 requests_.push_back(data); | 269 requests_.push_back(data); |
282 if (!was_empty) | 270 if (!was_empty) |
283 return; | 271 return; |
284 } | 272 } |
285 // We own |thread_|, so it's safe to use Unretained() here. | 273 // We outlive |thread_|, so it's safe to use Unretained() here. |
286 thread_.task_runner()->PostTask( | 274 thread_.task_runner()->PostTask( |
287 FROM_HERE, | 275 FROM_HERE, |
288 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, | 276 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, |
289 base::Unretained(this))); | 277 base::Unretained(this))); |
290 } | 278 } |
291 | 279 |
292 void WatcherThreadManager::ProcessRequestsOnBackendThread() { | 280 void WatcherThreadManager::ProcessRequestsOnBackendThread() { |
293 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); | 281 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); |
294 | 282 |
295 Requests requests; | 283 Requests requests; |
296 { | 284 { |
297 base::AutoLock auto_lock(lock_); | 285 base::AutoLock auto_lock(lock_); |
298 requests_.swap(requests); | 286 requests_.swap(requests); |
299 } | 287 } |
300 for (size_t i = 0; i < requests.size(); ++i) { | 288 for (size_t i = 0; i < requests.size(); ++i) { |
301 if (requests[i].type == REQUEST_START) { | 289 if (requests[i].type == REQUEST_START) { |
302 backend_.StartWatching(requests[i].start_data); | 290 backend_.StartWatching(requests[i].start_data); |
303 } else { | 291 } else { |
304 backend_.StopWatching(requests[i].stop_id); | 292 backend_.StopWatching(requests[i].stop_id); |
305 requests[i].stop_event->Signal(); | |
306 } | 293 } |
307 } | 294 } |
308 } | 295 } |
309 | 296 |
310 WatcherThreadManager::WatcherThreadManager() | 297 WatcherThreadManager::WatcherThreadManager() |
311 : thread_(kWatcherThreadName) { | 298 : thread_(kWatcherThreadName) { |
312 base::Thread::Options thread_options; | 299 base::Thread::Options thread_options; |
313 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); | 300 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); |
314 thread_.StartWithOptions(thread_options); | 301 thread_.StartWithOptions(thread_options); |
315 } | 302 } |
316 | 303 |
| 304 } // namespace |
| 305 |
317 // HandleWatcher::StateBase and subclasses ------------------------------------- | 306 // HandleWatcher::StateBase and subclasses ------------------------------------- |
318 | 307 |
319 // The base class of HandleWatcher's state. Owns the user's callback and | 308 // The base class of HandleWatcher's state. Owns the user's callback and |
320 // monitors the current thread's MessageLoop to know when to force the callback | 309 // monitors the current thread's MessageLoop to know when to force the callback |
321 // to run (with an error) even though the pipe hasn't been signaled yet. | 310 // to run (with an error) even though the pipe hasn't been signaled yet. |
322 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { | 311 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { |
323 public: | 312 public: |
324 StateBase(HandleWatcher* watcher, | 313 StateBase(HandleWatcher* watcher, |
325 const base::Callback<void(MojoResult)>& callback) | 314 const base::Callback<void(MojoResult)>& callback) |
326 : watcher_(watcher), | 315 : watcher_(watcher), |
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
472 this, handle, handle_signals, deadline, callback)); | 461 this, handle, handle_signals, deadline, callback)); |
473 } | 462 } |
474 } | 463 } |
475 | 464 |
476 void HandleWatcher::Stop() { | 465 void HandleWatcher::Stop() { |
477 state_.reset(); | 466 state_.reset(); |
478 } | 467 } |
479 | 468 |
480 } // namespace common | 469 } // namespace common |
481 } // namespace mojo | 470 } // namespace mojo |
OLD | NEW |