OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/common/handle_watcher.h" | 5 #include "mojo/common/handle_watcher.h" |
6 | 6 |
7 #include <map> | 7 #include <map> |
8 #include <set> | |
9 #include <vector> | |
10 | 8 |
9 #include "base/atomic_sequence_num.h" | |
11 #include "base/bind.h" | 10 #include "base/bind.h" |
12 #include "base/lazy_instance.h" | 11 #include "base/lazy_instance.h" |
13 #include "base/memory/weak_ptr.h" | 12 #include "base/memory/weak_ptr.h" |
14 #include "base/message_loop/message_loop.h" | 13 #include "base/message_loop/message_loop.h" |
15 #include "base/message_loop/message_loop_proxy.h" | 14 #include "base/message_loop/message_loop_proxy.h" |
16 #include "base/synchronization/lock.h" | |
17 #include "base/threading/thread.h" | 15 #include "base/threading/thread.h" |
18 #include "base/time/tick_clock.h" | 16 #include "base/time/tick_clock.h" |
19 #include "base/time/time.h" | 17 #include "base/time/time.h" |
20 #include "mojo/common/scoped_message_pipe.h" | 18 #include "mojo/common/message_pump_mojo.h" |
19 #include "mojo/common/message_pump_mojo_handler.h" | |
21 | 20 |
22 namespace mojo { | 21 namespace mojo { |
23 namespace common { | 22 namespace common { |
24 | 23 |
25 typedef int WatcherID; | 24 typedef int WatcherID; |
26 | 25 |
27 namespace { | 26 namespace { |
28 | 27 |
29 const char kWatcherThreadName[] = "handle-watcher-thread"; | 28 const char kWatcherThreadName[] = "handle-watcher-thread"; |
30 | 29 |
30 // TODO(sky): shouldn't be necessary to cache this. | |
darin (slow to review)
2013/11/21 18:14:52
This is going to be fixed once we sort out the RTT
sky
2013/11/22 19:39:44
I updated the TODO for now.
| |
31 MessagePumpMojo* message_pump_mojo_ = NULL; | |
darin (slow to review)
2013/11/21 18:14:52
nit: shouldn't end with "_"
sky
2013/11/22 19:39:44
Done.
| |
32 | |
33 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { | |
34 message_pump_mojo_ = new MessagePumpMojo; | |
35 return scoped_ptr<base::MessagePump>(message_pump_mojo_).Pass(); | |
36 } | |
37 | |
38 // Tracks the data for a single call to Start(). | |
39 struct WatchData { | |
40 WatchData() | |
41 : id(0), | |
42 handle(MOJO_HANDLE_INVALID), | |
43 wait_flags(MOJO_WAIT_FLAG_NONE), | |
44 message_loop(NULL) {} | |
45 | |
46 WatcherID id; | |
47 MojoHandle handle; | |
48 MojoWaitFlags wait_flags; | |
49 base::TimeTicks deadline; | |
50 base::Callback<void(MojoResult)> callback; | |
51 scoped_refptr<base::MessageLoopProxy> message_loop; | |
52 }; | |
53 | |
54 // WatcherBackend -------------------------------------------------------------- | |
55 | |
56 // WatcherBackend is responsible for managing the requests and interacting with | |
57 // MessagePumpMojo. All access (outside of creation/destruction) is done on the | |
58 // thread WatcherThreadManager creates. | |
59 class WatcherBackend : public MessagePumpMojoHandler { | |
60 public: | |
61 WatcherBackend(); | |
62 virtual ~WatcherBackend(); | |
63 | |
64 void StartWatching(const WatchData& data); | |
65 void StopWatching(WatcherID watcher_id); | |
66 | |
67 private: | |
68 typedef std::map<MojoHandle, WatchData> HandleToWatchDataMap; | |
69 | |
70 // Invoked when a handle needs to be removed and notified. | |
71 void RemoveAndNotify(MojoHandle handle, MojoResult result); | |
72 | |
73 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found | |
74 // and sets |handle| to the MojoHandle. Returns false if not a known id. | |
75 bool GetMojoHandleByWatcherID(WatcherID watcher_id, MojoHandle* handle) const; | |
76 | |
77 // MessagePumpMojoHandler overrides: | |
78 virtual void OnHandleReady(MojoHandle handle) OVERRIDE; | |
79 virtual void OnHandleError(MojoHandle handle, MojoResult result) OVERRIDE; | |
80 | |
81 // Maps from assigned id to WatchData. | |
82 HandleToWatchDataMap handle_to_data_; | |
83 | |
84 DISALLOW_COPY_AND_ASSIGN(WatcherBackend); | |
85 }; | |
86 | |
87 WatcherBackend::WatcherBackend() { | |
88 } | |
89 | |
90 WatcherBackend::~WatcherBackend() { | |
91 } | |
92 | |
93 void WatcherBackend::StartWatching(const WatchData& data) { | |
94 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); | |
95 | |
96 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); | |
97 | |
98 handle_to_data_[data.handle] = data; | |
99 message_pump_mojo_->AddHandler(this, data.handle, | |
100 data.wait_flags, | |
101 data.deadline); | |
102 } | |
103 | |
104 void WatcherBackend::StopWatching(WatcherID watcher_id) { | |
105 // Because of the thread hop it is entirely possible to get here and not | |
106 // have a valid handle registered for |watcher_id|. | |
107 MojoHandle handle; | |
108 if (!GetMojoHandleByWatcherID(watcher_id, &handle)) | |
109 return; | |
110 | |
111 handle_to_data_.erase(handle); | |
112 message_pump_mojo_->RemoveHandler(handle); | |
113 } | |
114 | |
115 void WatcherBackend::RemoveAndNotify(MojoHandle handle, | |
116 MojoResult result) { | |
117 if (handle_to_data_.count(handle) == 0) | |
118 return; | |
119 | |
120 const WatchData data(handle_to_data_[handle]); | |
121 handle_to_data_.erase(handle); | |
122 message_pump_mojo_->RemoveHandler(handle); | |
123 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); | |
darin (slow to review)
2013/11/21 18:14:52
Hmm, when called from OnHandleReady, this PostTask
sky
2013/11/22 19:39:44
Remember this is on the backend thread, so the Pos
| |
124 } | |
125 | |
126 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, | |
127 MojoHandle* handle) const { | |
128 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); | |
129 i != handle_to_data_.end(); ++i) { | |
130 if (i->second.id == watcher_id) { | |
131 *handle = i->second.handle; | |
132 return true; | |
133 } | |
134 } | |
135 return false; | |
136 } | |
137 | |
138 void WatcherBackend::OnHandleReady(MojoHandle handle) { | |
139 RemoveAndNotify(handle, MOJO_RESULT_OK); | |
140 } | |
141 | |
142 void WatcherBackend::OnHandleError(MojoHandle handle, MojoResult result) { | |
143 RemoveAndNotify(handle, result); | |
144 } | |
145 | |
31 // WatcherThreadManager -------------------------------------------------------- | 146 // WatcherThreadManager -------------------------------------------------------- |
32 | 147 |
33 // WatcherThreadManager manages listening for the handles. It is a singleton. It | 148 // WatcherThreadManager manages the background thread that listens for handles |
34 // spawns a thread that waits for any handle passed to StartWatching() to be | 149 // to be ready. All requests are handled by WatcherBackend. |
35 // ready. Additionally it creates a message pipe for communication between the | |
36 // two threads. The message pipe is used solely to wake up the background | |
37 // thread. This happens when the set of handles changes, or during shutdown. | |
38 class WatcherThreadManager { | 150 class WatcherThreadManager { |
39 public: | 151 public: |
40 // Returns the shared instance. | 152 // Returns the shared instance. |
41 static WatcherThreadManager* GetInstance(); | 153 static WatcherThreadManager* GetInstance(); |
42 | 154 |
43 // Starts watching the requested handle. Returns a unique ID that is used to | 155 // Starts watching the requested handle. Returns a unique ID that is used to |
44 // stop watching the handle. When the handle is ready |callback| is notified | 156 // stop watching the handle. When the handle is ready |callback| is notified |
45 // on the thread StartWatching() was invoked on. | 157 // on the thread StartWatching() was invoked on. |
46 // This may be invoked on any thread. | 158 // This may be invoked on any thread. |
47 WatcherID StartWatching(MojoHandle handle, | 159 WatcherID StartWatching(MojoHandle handle, |
48 MojoWaitFlags wait_flags, | 160 MojoWaitFlags wait_flags, |
49 base::TimeTicks deadline, | 161 base::TimeTicks deadline, |
50 const base::Callback<void(MojoResult)>& callback); | 162 const base::Callback<void(MojoResult)>& callback); |
51 | 163 |
52 // Stops watching a handle. | 164 // Stops watching a handle. |
165 // This may be invoked on any thread. | |
53 void StopWatching(WatcherID watcher_id); | 166 void StopWatching(WatcherID watcher_id); |
54 | 167 |
55 private: | 168 private: |
56 friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>; | 169 friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>; |
57 | 170 |
58 // Tracks a single request. | |
59 struct HandleAndCallback { | |
60 HandleAndCallback() | |
61 : handle(MOJO_HANDLE_INVALID), | |
62 wait_flags(MOJO_WAIT_FLAG_NONE), | |
63 message_loop(NULL) {} | |
64 | |
65 MojoHandle handle; | |
66 MojoWaitFlags wait_flags; | |
67 base::TimeTicks deadline; | |
68 base::Callback<void(MojoResult)> callback; | |
69 scoped_refptr<base::MessageLoopProxy> message_loop; | |
70 }; | |
71 | |
72 // Contains the state needed for MojoWaitMany. | |
73 // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to | |
74 // pass to MojoWaitMany. | |
75 struct WaitState { | |
76 // List of ids. | |
77 std::vector<WatcherID> ids; | |
78 | |
79 // List of handles. | |
80 std::vector<MojoHandle> handles; | |
81 | |
82 // List of flags each handle is waiting on. | |
83 std::vector<MojoWaitFlags> wait_flags; | |
84 | |
85 // First deadline. | |
86 MojoDeadline deadline; | |
87 | |
88 // Set of ids whose deadline has been reached. | |
89 std::set<WatcherID> deadline_exceeded; | |
90 }; | |
91 | |
92 typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap; | |
93 | |
94 WatcherThreadManager(); | 171 WatcherThreadManager(); |
95 ~WatcherThreadManager(); | 172 ~WatcherThreadManager(); |
96 | 173 |
97 // Invoked on the background thread. Runs a loop waiting on current set of | |
98 // handles. | |
99 void RunOnBackgroundThread(); | |
100 | |
101 // Writes to the communication pipe to wake up the background thread. | |
102 void SignalBackgroundThread(); | |
103 | |
104 // Invoked when a handle associated with |id| should be removed and notified. | |
105 // |result| gives the reason for removing. | |
106 void RemoveAndNotify(WatcherID id, MojoResult result); | |
107 | |
108 // Removes all callbacks schedule for |handle|. This is used when a handle | |
109 // is identified as invalid. | |
110 void RemoveHandle(MojoHandle handle); | |
111 | |
112 MojoHandle read_handle() const { return control_pipe_.handle_0(); } | |
113 MojoHandle write_handle() const { return control_pipe_.handle_1(); } | |
114 | |
115 // Returns state needed for MojoWaitMany. | |
116 WaitState GetWaitState(); | |
117 | |
118 // Guards members accessed on both threads. | |
119 base::Lock lock_; | |
120 | |
121 // Used for communicating with the background thread. | |
122 ScopedMessagePipe control_pipe_; | |
123 | |
124 base::Thread thread_; | 174 base::Thread thread_; |
125 | 175 |
126 // Maps from assigned id to the callback. | 176 base::AtomicSequenceNumber watcher_id_generator_; |
127 IDToCallbackMap id_to_callback_; | |
128 | 177 |
129 // If true the background loop should exit. | 178 WatcherBackend backend_; |
130 bool quit_; | |
131 | 179 |
132 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); | 180 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); |
133 }; | 181 }; |
134 | 182 |
135 WatcherThreadManager* WatcherThreadManager::GetInstance() { | 183 WatcherThreadManager* WatcherThreadManager::GetInstance() { |
136 static base::LazyInstance<WatcherThreadManager> instance = | 184 static base::LazyInstance<WatcherThreadManager> instance = |
137 LAZY_INSTANCE_INITIALIZER; | 185 LAZY_INSTANCE_INITIALIZER; |
138 return &instance.Get(); | 186 return &instance.Get(); |
139 } | 187 } |
140 | 188 |
141 WatcherID WatcherThreadManager::StartWatching( | 189 WatcherID WatcherThreadManager::StartWatching( |
142 MojoHandle handle, | 190 MojoHandle handle, |
143 MojoWaitFlags wait_flags, | 191 MojoWaitFlags wait_flags, |
144 base::TimeTicks deadline, | 192 base::TimeTicks deadline, |
145 const base::Callback<void(MojoResult)>& callback) { | 193 const base::Callback<void(MojoResult)>& callback) { |
146 WatcherID id = 0; | 194 WatchData data; |
147 { | 195 data.id = watcher_id_generator_.GetNext(); |
148 static int next_id = 0; | 196 data.handle = handle; |
149 base::AutoLock lock(lock_); | 197 data.callback = callback; |
150 // TODO(sky): worry about overflow? | 198 data.wait_flags = wait_flags; |
151 id = ++next_id; | 199 data.deadline = deadline; |
152 id_to_callback_[id].handle = handle; | 200 data.message_loop = base::MessageLoopProxy::current(); |
153 id_to_callback_[id].callback = callback; | 201 // We outlive |thread_|, so it's safe to use Unretained() here. |
154 id_to_callback_[id].wait_flags = wait_flags; | 202 thread_.message_loop()->PostTask( |
155 id_to_callback_[id].deadline = deadline; | 203 FROM_HERE, |
156 id_to_callback_[id].message_loop = base::MessageLoopProxy::current(); | 204 base::Bind(&WatcherBackend::StartWatching, |
157 } | 205 base::Unretained(&backend_), |
158 SignalBackgroundThread(); | 206 data)); |
159 return id; | 207 return data.id; |
160 } | 208 } |
161 | 209 |
162 | |
163 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { | 210 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { |
164 { | 211 // We outlive |thread_|, so it's safe to use Unretained() here. |
165 base::AutoLock lock(lock_); | 212 thread_.message_loop()->PostTask( |
166 // It's possible we've already serviced the handle but HandleWatcher hasn't | 213 FROM_HERE, |
167 // processed it yet. | 214 base::Bind(&WatcherBackend::StopWatching, |
168 IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id); | 215 base::Unretained(&backend_), |
169 if (i == id_to_callback_.end()) | 216 watcher_id)); |
170 return; | |
171 id_to_callback_.erase(i); | |
172 } | |
173 SignalBackgroundThread(); | |
174 } | 217 } |
175 | 218 |
176 WatcherThreadManager::WatcherThreadManager() | 219 WatcherThreadManager::WatcherThreadManager() |
177 : thread_(kWatcherThreadName), | 220 : thread_(kWatcherThreadName) { |
178 quit_(false) { | 221 base::Thread::Options thread_options; |
179 // TODO(sky): deal with error condition? | 222 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); |
180 CHECK_NE(MOJO_HANDLE_INVALID, read_handle()); | 223 thread_.StartWithOptions(thread_options); |
181 thread_.Start(); | |
182 thread_.message_loop()->PostTask( | |
183 FROM_HERE, | |
184 base::Bind(&WatcherThreadManager::RunOnBackgroundThread, | |
185 base::Unretained(this))); | |
186 } | 224 } |
187 | 225 |
188 WatcherThreadManager::~WatcherThreadManager() { | 226 WatcherThreadManager::~WatcherThreadManager() { |
189 { | |
190 base::AutoLock lock(lock_); | |
191 quit_ = true; | |
192 } | |
193 SignalBackgroundThread(); | |
194 | |
195 thread_.Stop(); | 227 thread_.Stop(); |
196 } | 228 } |
197 | 229 |
198 void WatcherThreadManager::RunOnBackgroundThread() { | |
199 while (true) { | |
200 const WaitState state = GetWaitState(); | |
201 for (std::set<WatcherID>::const_iterator i = | |
202 state.deadline_exceeded.begin(); | |
203 i != state.deadline_exceeded.end(); ++i) { | |
204 RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED); | |
205 } | |
206 const MojoResult result = MojoWaitMany(&state.handles.front(), | |
207 &state.wait_flags.front(), | |
208 state.handles.size(), | |
209 state.deadline); | |
210 | |
211 if (result >= 0) { | |
212 DCHECK_LT(result, static_cast<int>(state.handles.size())); | |
213 // Last handle is used to wake us up. | |
214 if (result == static_cast<int>(state.handles.size()) - 1) { | |
215 uint32_t num_bytes = 0; | |
216 MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0, | |
217 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | |
218 { | |
219 base::AutoLock lock(lock_); | |
220 if (quit_) | |
221 return; | |
222 } | |
223 } else { | |
224 RemoveAndNotify(state.ids[result], MOJO_RESULT_OK); | |
225 } | |
226 } else if (result == MOJO_RESULT_INVALID_ARGUMENT || | |
227 result == MOJO_RESULT_FAILED_PRECONDITION) { | |
228 // One of the handles is invalid or the flags supplied is invalid, remove | |
229 // it. | |
230 // Use -1 as last handle is used for communication and should never be | |
231 // invalid. | |
232 for (size_t i = 0; i < state.handles.size() - 1; ++i) { | |
233 const MojoResult result = | |
234 MojoWait(state.handles[i], state.wait_flags[i], 0); | |
235 switch (result) { | |
236 // TODO: do we really want to notify for all these conditions? | |
237 case MOJO_RESULT_OK: | |
238 case MOJO_RESULT_FAILED_PRECONDITION: | |
239 case MOJO_RESULT_INVALID_ARGUMENT: | |
240 RemoveAndNotify(state.ids[i], result); | |
241 break; | |
242 case MOJO_RESULT_DEADLINE_EXCEEDED: | |
243 break; | |
244 default: | |
245 NOTREACHED(); | |
246 } | |
247 } | |
248 } | |
249 } | |
250 } | |
251 | |
252 void WatcherThreadManager::SignalBackgroundThread() { | |
253 // TODO(sky): deal with error? | |
254 MojoWriteMessage(write_handle(), NULL, 0, NULL, 0, | |
255 MOJO_WRITE_MESSAGE_FLAG_NONE); | |
256 } | |
257 | |
258 void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) { | |
259 HandleAndCallback to_notify; | |
260 { | |
261 base::AutoLock lock(lock_); | |
262 IDToCallbackMap::iterator i = id_to_callback_.find(id); | |
263 if (i == id_to_callback_.end()) | |
264 return; | |
265 to_notify = i->second; | |
266 id_to_callback_.erase(i); | |
267 } | |
268 to_notify.message_loop->PostTask(FROM_HERE, | |
269 base::Bind(to_notify.callback, result)); | |
270 } | |
271 | |
272 void WatcherThreadManager::RemoveHandle(MojoHandle handle) { | |
273 { | |
274 base::AutoLock lock(lock_); | |
275 for (IDToCallbackMap::iterator i = id_to_callback_.begin(); | |
276 i != id_to_callback_.end(); ) { | |
277 if (i->second.handle == handle) { | |
278 id_to_callback_.erase(i++); | |
279 } else { | |
280 ++i; | |
281 } | |
282 } | |
283 } | |
284 } | |
285 | |
286 WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() { | |
287 WaitState state; | |
288 const base::TimeTicks now(HandleWatcher::NowTicks()); | |
289 base::TimeDelta deadline; | |
290 { | |
291 base::AutoLock lock(lock_); | |
292 for (IDToCallbackMap::const_iterator i = id_to_callback_.begin(); | |
293 i != id_to_callback_.end(); ++i) { | |
294 if (!i->second.deadline.is_null()) { | |
295 if (i->second.deadline <= now) { | |
296 state.deadline_exceeded.insert(i->first); | |
297 continue; | |
298 } else { | |
299 const base::TimeDelta delta = i->second.deadline - now; | |
300 if (deadline == base::TimeDelta() || delta < deadline) | |
301 deadline = delta; | |
302 } | |
303 } | |
304 state.ids.push_back(i->first); | |
305 state.handles.push_back(i->second.handle); | |
306 state.wait_flags.push_back(i->second.wait_flags); | |
307 } | |
308 } | |
309 state.ids.push_back(0); | |
310 state.handles.push_back(read_handle()); | |
311 state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE); | |
312 state.deadline = (deadline == base::TimeDelta()) ? | |
313 MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds(); | |
314 return state; | |
315 } | |
316 | |
317 } // namespace | 230 } // namespace |
318 | 231 |
319 // HandleWatcher::StartState --------------------------------------------------- | 232 // HandleWatcher::StartState --------------------------------------------------- |
320 | 233 |
321 // Contains the information passed to Start(). | 234 // Contains the information passed to Start(). |
322 struct HandleWatcher::StartState { | 235 struct HandleWatcher::StartState { |
323 explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) { | 236 explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) { |
324 } | 237 } |
325 | 238 |
326 ~StartState() { | 239 ~StartState() { |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
395 } | 308 } |
396 | 309 |
397 // static | 310 // static |
398 base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) { | 311 base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) { |
399 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : | 312 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : |
400 NowTicks() + base::TimeDelta::FromMicroseconds(deadline); | 313 NowTicks() + base::TimeDelta::FromMicroseconds(deadline); |
401 } | 314 } |
402 | 315 |
403 } // namespace common | 316 } // namespace common |
404 } // namespace mojo | 317 } // namespace mojo |
OLD | NEW |