Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(232)

Side by Side Diff: mojo/shell/handle_watcher.cc

Issue 64973002: Moves some files into mojo/common (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/shell/handle_watcher.h"
6
7 #include <map>
8 #include <set>
9 #include <vector>
10
11 #include "base/bind.h"
12 #include "base/lazy_instance.h"
13 #include "base/memory/weak_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/message_loop/message_loop_proxy.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h"
18 #include "base/time/tick_clock.h"
19 #include "base/time/time.h"
20 #include "mojo/shell/scoped_message_pipe.h"
21
22 namespace mojo {
23 namespace shell {
24
25 typedef int WatcherID;
26
27 namespace {
28
29 const char kWatcherThreadName[] = "handle-watcher-thread";
30
31 // WatcherThreadManager --------------------------------------------------------
32
33 // WatcherThreadManager manages listening for the handles. It is a singleton. It
34 // spawns a thread that waits for any handle passed to StartWatching() to be
35 // ready. Additionally it creates a message pipe for communication between the
36 // two threads. The message pipe is used solely to wake up the background
37 // thread. This happens when the set of handles changes, or during shutdown.
38 class WatcherThreadManager {
39 public:
40 // Returns the shared instance.
41 static WatcherThreadManager* GetInstance();
42
43 // Starts watching the requested handle. Returns a unique ID that is used to
44 // stop watching the handle. When the handle is ready |callback| is notified
45 // on the thread StartWatching() was invoked on.
46 // This may be invoked on any thread.
47 WatcherID StartWatching(MojoHandle handle,
48 MojoWaitFlags wait_flags,
49 base::TimeTicks deadline,
50 const base::Closure& callback);
51
52 // Stops watching a handle.
53 void StopWatching(WatcherID watcher_id);
54
55 private:
56 friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
57
58 // Tracks a single request.
59 struct HandleAndCallback {
60 HandleAndCallback()
61 : handle(MOJO_HANDLE_INVALID),
62 wait_flags(MOJO_WAIT_FLAG_NONE),
63 message_loop(NULL) {}
64
65 MojoHandle handle;
66 MojoWaitFlags wait_flags;
67 base::TimeTicks deadline;
68 base::Closure callback;
69 scoped_refptr<base::MessageLoopProxy> message_loop;
70 };
71
72 // Contains the state needed for MojoWaitMany.
73 // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to
74 // pass to MojoWaitMany.
75 struct WaitState {
76 // List of ids.
77 std::vector<WatcherID> ids;
78
79 // List of handles.
80 std::vector<MojoHandle> handles;
81
82 // List of flags each handle is waiting on.
83 std::vector<MojoWaitFlags> wait_flags;
84
85 // First deadline.
86 MojoDeadline deadline;
87
88 // Set of ids whose deadline has been reached.
89 std::set<WatcherID> deadline_exceeded;
90 };
91
92 typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap;
93
94 WatcherThreadManager();
95 ~WatcherThreadManager();
96
97 // Invoked on the background thread. Runs a loop waiting on current set of
98 // handles.
99 void RunOnBackgroundThread();
100
101 // Writes to the communication pipe to wake up the background thread.
102 void SignalBackgroundThread();
103
104 // Invoked when a handle associated with |id| should be removed and notified.
105 // |result| gives the reason for removing.
106 void RemoveAndNotify(WatcherID id, MojoResult result);
107
108 // Removes all callbacks schedule for |handle|. This is used when a handle
109 // is identified as invalid.
110 void RemoveHandle(MojoHandle handle);
111
112 MojoHandle read_handle() const { return control_pipe_.handle_0(); }
113 MojoHandle write_handle() const { return control_pipe_.handle_1(); }
114
115 // Returns state needed for MojoWaitMany.
116 WaitState GetWaitState();
117
118 // Guards members accessed on both threads.
119 base::Lock lock_;
120
121 // Used for communicating with the background thread.
122 ScopedMessagePipe control_pipe_;
123
124 base::Thread thread_;
125
126 // Maps from assigned id to the callback.
127 IDToCallbackMap id_to_callback_;
128
129 // If true the background loop should exit.
130 bool quit_;
131
132 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
133 };
134
135 WatcherThreadManager* WatcherThreadManager::GetInstance() {
136 static base::LazyInstance<WatcherThreadManager> instance =
137 LAZY_INSTANCE_INITIALIZER;
138 return &instance.Get();
139 }
140
141 WatcherID WatcherThreadManager::StartWatching(MojoHandle handle,
142 MojoWaitFlags wait_flags,
143 base::TimeTicks deadline,
144 const base::Closure& callback) {
145 WatcherID id = 0;
146 {
147 static int next_id = 0;
148 base::AutoLock lock(lock_);
149 // TODO(sky): worry about overflow?
150 id = ++next_id;
151 id_to_callback_[id].handle = handle;
152 id_to_callback_[id].callback = callback;
153 id_to_callback_[id].wait_flags = wait_flags;
154 id_to_callback_[id].deadline = deadline;
155 id_to_callback_[id].message_loop = base::MessageLoopProxy::current();
156 }
157 SignalBackgroundThread();
158 return id;
159 }
160
161
162 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
163 {
164 base::AutoLock lock(lock_);
165 // It's possible we've already serviced the handle but HandleWatcher hasn't
166 // processed it yet.
167 IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id);
168 if (i == id_to_callback_.end())
169 return;
170 id_to_callback_.erase(i);
171 }
172 SignalBackgroundThread();
173 }
174
175 WatcherThreadManager::WatcherThreadManager()
176 : thread_(kWatcherThreadName),
177 quit_(false) {
178 // TODO(sky): deal with error condition?
179 CHECK_NE(MOJO_HANDLE_INVALID, read_handle());
180 thread_.Start();
181 thread_.message_loop()->PostTask(
182 FROM_HERE,
183 base::Bind(&WatcherThreadManager::RunOnBackgroundThread,
184 base::Unretained(this)));
185 }
186
187 WatcherThreadManager::~WatcherThreadManager() {
188 {
189 base::AutoLock lock(lock_);
190 quit_ = true;
191 }
192 SignalBackgroundThread();
193
194 thread_.Stop();
195 }
196
197 void WatcherThreadManager::RunOnBackgroundThread() {
198 while (true) {
199 const WaitState state = GetWaitState();
200 for (std::set<WatcherID>::const_iterator i =
201 state.deadline_exceeded.begin();
202 i != state.deadline_exceeded.end(); ++i) {
203 RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED);
204 }
205 const MojoResult result = MojoWaitMany(&state.handles.front(),
206 &state.wait_flags.front(),
207 state.handles.size(),
208 state.deadline);
209
210 if (result >= 0) {
211 DCHECK_LT(result, static_cast<int>(state.handles.size()));
212 // Last handle is used to wake us up.
213 if (result == static_cast<int>(state.handles.size()) - 1) {
214 uint32_t num_bytes = 0;
215 MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0,
216 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
217 {
218 base::AutoLock lock(lock_);
219 if (quit_)
220 return;
221 }
222 } else {
223 RemoveAndNotify(state.ids[result], MOJO_RESULT_OK);
224 }
225 } else if (result == MOJO_RESULT_INVALID_ARGUMENT ||
226 result == MOJO_RESULT_FAILED_PRECONDITION) {
227 // One of the handles is invalid or the flags supplied is invalid, remove
228 // it.
229 // Use -1 as last handle is used for communication and should never be
230 // invalid.
231 for (size_t i = 0; i < state.handles.size() - 1; ++i) {
232 const MojoResult result =
233 MojoWait(state.handles[i], state.wait_flags[i], 0);
234 switch (result) {
235 // TODO: do we really want to notify for all these conditions?
236 case MOJO_RESULT_OK:
237 case MOJO_RESULT_FAILED_PRECONDITION:
238 case MOJO_RESULT_INVALID_ARGUMENT:
239 RemoveAndNotify(state.ids[i], result);
240 break;
241 case MOJO_RESULT_DEADLINE_EXCEEDED:
242 break;
243 default:
244 NOTREACHED();
245 }
246 }
247 }
248 }
249 }
250
251 void WatcherThreadManager::SignalBackgroundThread() {
252 // TODO(sky): deal with error?
253 MojoWriteMessage(write_handle(), NULL, 0, NULL, 0,
254 MOJO_WRITE_MESSAGE_FLAG_NONE);
255 }
256
257 void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) {
258 HandleAndCallback to_notify;
259 {
260 base::AutoLock lock(lock_);
261 IDToCallbackMap::iterator i = id_to_callback_.find(id);
262 if (i == id_to_callback_.end())
263 return;
264 to_notify = i->second;
265 id_to_callback_.erase(i);
266 }
267 to_notify.message_loop->PostTask(FROM_HERE, to_notify.callback);
268 }
269
270 void WatcherThreadManager::RemoveHandle(MojoHandle handle) {
271 {
272 base::AutoLock lock(lock_);
273 for (IDToCallbackMap::iterator i = id_to_callback_.begin();
274 i != id_to_callback_.end(); ) {
275 if (i->second.handle == handle) {
276 id_to_callback_.erase(i++);
277 } else {
278 ++i;
279 }
280 }
281 }
282 }
283
284 WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() {
285 WaitState state;
286 const base::TimeTicks now(HandleWatcher::NowTicks());
287 base::TimeDelta deadline;
288 {
289 base::AutoLock lock(lock_);
290 for (IDToCallbackMap::const_iterator i = id_to_callback_.begin();
291 i != id_to_callback_.end(); ++i) {
292 if (!i->second.deadline.is_null()) {
293 if (i->second.deadline <= now) {
294 state.deadline_exceeded.insert(i->first);
295 continue;
296 } else {
297 const base::TimeDelta delta = i->second.deadline - now;
298 if (deadline == base::TimeDelta() || delta < deadline)
299 deadline = delta;
300 }
301 }
302 state.ids.push_back(i->first);
303 state.handles.push_back(i->second.handle);
304 state.wait_flags.push_back(i->second.wait_flags);
305 }
306 }
307 state.ids.push_back(0);
308 state.handles.push_back(read_handle());
309 state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE);
310 state.deadline = (deadline == base::TimeDelta()) ?
311 MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds();
312 return state;
313 }
314
315 } // namespace
316
317 // HandleWatcher::StartState ---------------------------------------------------
318
319 // Contains the information passed to Start().
320 struct HandleWatcher::StartState {
321 explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
322 }
323
324 ~StartState() {
325 }
326
327 // ID assigned by WatcherThreadManager.
328 WatcherID watcher_id;
329
330 // Callback to notify when done.
331 base::Closure callback;
332
333 // When Start() is invoked a callback is passed to WatcherThreadManager
334 // using a WeakRef from |weak_refactory_|. The callback invokes
335 // OnHandleReady() (on the thread Start() is invoked from) which in turn
336 // notifies |callback_|. Doing this allows us to reset state when the handle
337 // is ready, and then notify the callback. Doing this also means Stop()
338 // cancels any pending callbacks that may be inflight.
339 base::WeakPtrFactory<HandleWatcher> weak_factory;
340 };
341
342 // HandleWatcher ---------------------------------------------------------------
343
344 // static
345 base::TickClock* HandleWatcher::tick_clock_ = NULL;
346
347 HandleWatcher::HandleWatcher() {
348 }
349
350 HandleWatcher::~HandleWatcher() {
351 Stop();
352 }
353
354 void HandleWatcher::Start(MojoHandle handle,
355 MojoWaitFlags wait_flags,
356 MojoDeadline deadline,
357 const base::Closure& callback) {
358 DCHECK_NE(MOJO_HANDLE_INVALID, handle);
359 DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
360
361 Stop();
362
363 start_state_.reset(new StartState(this));
364 start_state_->callback = callback;
365 start_state_->watcher_id =
366 WatcherThreadManager::GetInstance()->StartWatching(
367 handle,
368 wait_flags,
369 MojoDeadlineToTimeTicks(deadline),
370 base::Bind(&HandleWatcher::OnHandleReady,
371 start_state_->weak_factory.GetWeakPtr()));
372 }
373
374 void HandleWatcher::Stop() {
375 if (!start_state_.get())
376 return;
377
378 scoped_ptr<StartState> old_state(start_state_.Pass());
379 WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
380 }
381
382 void HandleWatcher::OnHandleReady() {
383 DCHECK(start_state_.get());
384 scoped_ptr<StartState> old_state(start_state_.Pass());
385 old_state->callback.Run();
386 }
387
388 // static
389 base::TimeTicks HandleWatcher::NowTicks() {
390 return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now();
391 }
392
393 // static
394 base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
395 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
396 NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
397 }
398
399 } // namespace shell
400 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698