Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(212)

Side by Side Diff: mojo/common/handle_watcher.cc

Issue 69883008: Implements HandleWatcher in terms of MessagePumpMojo (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: add id to detect whether should notify Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | mojo/common/message_pump_mojo.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/common/handle_watcher.h" 5 #include "mojo/common/handle_watcher.h"
6 6
7 #include <map> 7 #include <map>
8 #include <set>
9 #include <vector>
10 8
9 #include "base/atomic_sequence_num.h"
11 #include "base/bind.h" 10 #include "base/bind.h"
12 #include "base/lazy_instance.h" 11 #include "base/lazy_instance.h"
13 #include "base/memory/weak_ptr.h" 12 #include "base/memory/weak_ptr.h"
14 #include "base/message_loop/message_loop.h" 13 #include "base/message_loop/message_loop.h"
15 #include "base/message_loop/message_loop_proxy.h" 14 #include "base/message_loop/message_loop_proxy.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h" 15 #include "base/threading/thread.h"
18 #include "base/time/tick_clock.h" 16 #include "base/time/tick_clock.h"
19 #include "base/time/time.h" 17 #include "base/time/time.h"
20 #include "mojo/common/scoped_message_pipe.h" 18 #include "mojo/common/message_pump_mojo.h"
19 #include "mojo/common/message_pump_mojo_handler.h"
21 20
22 namespace mojo { 21 namespace mojo {
23 namespace common { 22 namespace common {
24 23
25 typedef int WatcherID; 24 typedef int WatcherID;
26 25
27 namespace { 26 namespace {
28 27
29 const char kWatcherThreadName[] = "handle-watcher-thread"; 28 const char kWatcherThreadName[] = "handle-watcher-thread";
30 29
30 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
31 MessagePumpMojo* message_pump_mojo = NULL;
32
33 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
34 message_pump_mojo = new MessagePumpMojo;
35 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
36 }
37
38 // Tracks the data for a single call to Start().
39 struct WatchData {
40 WatchData()
41 : id(0),
42 handle(MOJO_HANDLE_INVALID),
43 wait_flags(MOJO_WAIT_FLAG_NONE),
44 message_loop(NULL) {}
45
46 WatcherID id;
47 MojoHandle handle;
48 MojoWaitFlags wait_flags;
49 base::TimeTicks deadline;
50 base::Callback<void(MojoResult)> callback;
51 scoped_refptr<base::MessageLoopProxy> message_loop;
52 };
53
54 // WatcherBackend --------------------------------------------------------------
55
56 // WatcherBackend is responsible for managing the requests and interacting with
57 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
58 // thread WatcherThreadManager creates.
59 class WatcherBackend : public MessagePumpMojoHandler {
60 public:
61 WatcherBackend();
62 virtual ~WatcherBackend();
63
64 void StartWatching(const WatchData& data);
65 void StopWatching(WatcherID watcher_id);
66
67 private:
68 typedef std::map<MojoHandle, WatchData> HandleToWatchDataMap;
69
70 // Invoked when a handle needs to be removed and notified.
71 void RemoveAndNotify(MojoHandle handle, MojoResult result);
72
73 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
74 // and sets |handle| to the MojoHandle. Returns false if not a known id.
75 bool GetMojoHandleByWatcherID(WatcherID watcher_id, MojoHandle* handle) const;
76
77 // MessagePumpMojoHandler overrides:
78 virtual void OnHandleReady(MojoHandle handle) OVERRIDE;
79 virtual void OnHandleError(MojoHandle handle, MojoResult result) OVERRIDE;
80
81 // Maps from assigned id to WatchData.
82 HandleToWatchDataMap handle_to_data_;
83
84 DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
85 };
86
87 WatcherBackend::WatcherBackend() {
88 }
89
90 WatcherBackend::~WatcherBackend() {
91 }
92
93 void WatcherBackend::StartWatching(const WatchData& data) {
94 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
95
96 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
97
98 handle_to_data_[data.handle] = data;
99 message_pump_mojo->AddHandler(this, data.handle,
100 data.wait_flags,
101 data.deadline);
102 }
103
104 void WatcherBackend::StopWatching(WatcherID watcher_id) {
105 // Because of the thread hop it is entirely possible to get here and not
106 // have a valid handle registered for |watcher_id|.
107 MojoHandle handle;
108 if (!GetMojoHandleByWatcherID(watcher_id, &handle))
109 return;
110
111 handle_to_data_.erase(handle);
112 message_pump_mojo->RemoveHandler(handle);
113 }
114
115 void WatcherBackend::RemoveAndNotify(MojoHandle handle,
116 MojoResult result) {
117 if (handle_to_data_.count(handle) == 0)
118 return;
119
120 const WatchData data(handle_to_data_[handle]);
121 handle_to_data_.erase(handle);
122 message_pump_mojo->RemoveHandler(handle);
123 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
124 }
125
126 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
127 MojoHandle* handle) const {
128 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
129 i != handle_to_data_.end(); ++i) {
130 if (i->second.id == watcher_id) {
131 *handle = i->second.handle;
132 return true;
133 }
134 }
135 return false;
136 }
137
138 void WatcherBackend::OnHandleReady(MojoHandle handle) {
139 RemoveAndNotify(handle, MOJO_RESULT_OK);
140 }
141
142 void WatcherBackend::OnHandleError(MojoHandle handle, MojoResult result) {
143 RemoveAndNotify(handle, result);
144 }
145
31 // WatcherThreadManager -------------------------------------------------------- 146 // WatcherThreadManager --------------------------------------------------------
32 147
33 // WatcherThreadManager manages listening for the handles. It is a singleton. It 148 // WatcherThreadManager manages the background thread that listens for handles
34 // spawns a thread that waits for any handle passed to StartWatching() to be 149 // to be ready. All requests are handled by WatcherBackend.
35 // ready. Additionally it creates a message pipe for communication between the
36 // two threads. The message pipe is used solely to wake up the background
37 // thread. This happens when the set of handles changes, or during shutdown.
38 class WatcherThreadManager { 150 class WatcherThreadManager {
39 public: 151 public:
40 // Returns the shared instance. 152 // Returns the shared instance.
41 static WatcherThreadManager* GetInstance(); 153 static WatcherThreadManager* GetInstance();
42 154
43 // Starts watching the requested handle. Returns a unique ID that is used to 155 // Starts watching the requested handle. Returns a unique ID that is used to
44 // stop watching the handle. When the handle is ready |callback| is notified 156 // stop watching the handle. When the handle is ready |callback| is notified
45 // on the thread StartWatching() was invoked on. 157 // on the thread StartWatching() was invoked on.
46 // This may be invoked on any thread. 158 // This may be invoked on any thread.
47 WatcherID StartWatching(MojoHandle handle, 159 WatcherID StartWatching(MojoHandle handle,
48 MojoWaitFlags wait_flags, 160 MojoWaitFlags wait_flags,
49 base::TimeTicks deadline, 161 base::TimeTicks deadline,
50 const base::Callback<void(MojoResult)>& callback); 162 const base::Callback<void(MojoResult)>& callback);
51 163
52 // Stops watching a handle. 164 // Stops watching a handle.
165 // This may be invoked on any thread.
53 void StopWatching(WatcherID watcher_id); 166 void StopWatching(WatcherID watcher_id);
54 167
55 private: 168 private:
56 friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>; 169 friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
57 170
58 // Tracks a single request.
59 struct HandleAndCallback {
60 HandleAndCallback()
61 : handle(MOJO_HANDLE_INVALID),
62 wait_flags(MOJO_WAIT_FLAG_NONE),
63 message_loop(NULL) {}
64
65 MojoHandle handle;
66 MojoWaitFlags wait_flags;
67 base::TimeTicks deadline;
68 base::Callback<void(MojoResult)> callback;
69 scoped_refptr<base::MessageLoopProxy> message_loop;
70 };
71
72 // Contains the state needed for MojoWaitMany.
73 // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to
74 // pass to MojoWaitMany.
75 struct WaitState {
76 // List of ids.
77 std::vector<WatcherID> ids;
78
79 // List of handles.
80 std::vector<MojoHandle> handles;
81
82 // List of flags each handle is waiting on.
83 std::vector<MojoWaitFlags> wait_flags;
84
85 // First deadline.
86 MojoDeadline deadline;
87
88 // Set of ids whose deadline has been reached.
89 std::set<WatcherID> deadline_exceeded;
90 };
91
92 typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap;
93
94 WatcherThreadManager(); 171 WatcherThreadManager();
95 ~WatcherThreadManager(); 172 ~WatcherThreadManager();
96 173
97 // Invoked on the background thread. Runs a loop waiting on current set of
98 // handles.
99 void RunOnBackgroundThread();
100
101 // Writes to the communication pipe to wake up the background thread.
102 void SignalBackgroundThread();
103
104 // Invoked when a handle associated with |id| should be removed and notified.
105 // |result| gives the reason for removing.
106 void RemoveAndNotify(WatcherID id, MojoResult result);
107
108 // Removes all callbacks schedule for |handle|. This is used when a handle
109 // is identified as invalid.
110 void RemoveHandle(MojoHandle handle);
111
112 MojoHandle read_handle() const { return control_pipe_.handle_0(); }
113 MojoHandle write_handle() const { return control_pipe_.handle_1(); }
114
115 // Returns state needed for MojoWaitMany.
116 WaitState GetWaitState();
117
118 // Guards members accessed on both threads.
119 base::Lock lock_;
120
121 // Used for communicating with the background thread.
122 ScopedMessagePipe control_pipe_;
123
124 base::Thread thread_; 174 base::Thread thread_;
125 175
126 // Maps from assigned id to the callback. 176 base::AtomicSequenceNumber watcher_id_generator_;
127 IDToCallbackMap id_to_callback_;
128 177
129 // If true the background loop should exit. 178 WatcherBackend backend_;
130 bool quit_;
131 179
132 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); 180 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
133 }; 181 };
134 182
135 WatcherThreadManager* WatcherThreadManager::GetInstance() { 183 WatcherThreadManager* WatcherThreadManager::GetInstance() {
136 static base::LazyInstance<WatcherThreadManager> instance = 184 static base::LazyInstance<WatcherThreadManager> instance =
137 LAZY_INSTANCE_INITIALIZER; 185 LAZY_INSTANCE_INITIALIZER;
138 return &instance.Get(); 186 return &instance.Get();
139 } 187 }
140 188
141 WatcherID WatcherThreadManager::StartWatching( 189 WatcherID WatcherThreadManager::StartWatching(
142 MojoHandle handle, 190 MojoHandle handle,
143 MojoWaitFlags wait_flags, 191 MojoWaitFlags wait_flags,
144 base::TimeTicks deadline, 192 base::TimeTicks deadline,
145 const base::Callback<void(MojoResult)>& callback) { 193 const base::Callback<void(MojoResult)>& callback) {
146 WatcherID id = 0; 194 WatchData data;
147 { 195 data.id = watcher_id_generator_.GetNext();
148 static int next_id = 0; 196 data.handle = handle;
149 base::AutoLock lock(lock_); 197 data.callback = callback;
150 // TODO(sky): worry about overflow? 198 data.wait_flags = wait_flags;
151 id = ++next_id; 199 data.deadline = deadline;
152 id_to_callback_[id].handle = handle; 200 data.message_loop = base::MessageLoopProxy::current();
153 id_to_callback_[id].callback = callback; 201 // We outlive |thread_|, so it's safe to use Unretained() here.
154 id_to_callback_[id].wait_flags = wait_flags; 202 thread_.message_loop()->PostTask(
155 id_to_callback_[id].deadline = deadline; 203 FROM_HERE,
156 id_to_callback_[id].message_loop = base::MessageLoopProxy::current(); 204 base::Bind(&WatcherBackend::StartWatching,
157 } 205 base::Unretained(&backend_),
158 SignalBackgroundThread(); 206 data));
159 return id; 207 return data.id;
160 } 208 }
161 209
162
163 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { 210 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
164 { 211 // We outlive |thread_|, so it's safe to use Unretained() here.
165 base::AutoLock lock(lock_); 212 thread_.message_loop()->PostTask(
166 // It's possible we've already serviced the handle but HandleWatcher hasn't 213 FROM_HERE,
167 // processed it yet. 214 base::Bind(&WatcherBackend::StopWatching,
168 IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id); 215 base::Unretained(&backend_),
169 if (i == id_to_callback_.end()) 216 watcher_id));
170 return;
171 id_to_callback_.erase(i);
172 }
173 SignalBackgroundThread();
174 } 217 }
175 218
176 WatcherThreadManager::WatcherThreadManager() 219 WatcherThreadManager::WatcherThreadManager()
177 : thread_(kWatcherThreadName), 220 : thread_(kWatcherThreadName) {
178 quit_(false) { 221 base::Thread::Options thread_options;
179 // TODO(sky): deal with error condition? 222 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
180 CHECK_NE(MOJO_HANDLE_INVALID, read_handle()); 223 thread_.StartWithOptions(thread_options);
181 thread_.Start();
182 thread_.message_loop()->PostTask(
183 FROM_HERE,
184 base::Bind(&WatcherThreadManager::RunOnBackgroundThread,
185 base::Unretained(this)));
186 } 224 }
187 225
188 WatcherThreadManager::~WatcherThreadManager() { 226 WatcherThreadManager::~WatcherThreadManager() {
189 {
190 base::AutoLock lock(lock_);
191 quit_ = true;
192 }
193 SignalBackgroundThread();
194
195 thread_.Stop(); 227 thread_.Stop();
196 } 228 }
197 229
198 void WatcherThreadManager::RunOnBackgroundThread() {
199 while (true) {
200 const WaitState state = GetWaitState();
201 for (std::set<WatcherID>::const_iterator i =
202 state.deadline_exceeded.begin();
203 i != state.deadline_exceeded.end(); ++i) {
204 RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED);
205 }
206 const MojoResult result = MojoWaitMany(&state.handles.front(),
207 &state.wait_flags.front(),
208 state.handles.size(),
209 state.deadline);
210
211 if (result >= 0) {
212 DCHECK_LT(result, static_cast<int>(state.handles.size()));
213 // Last handle is used to wake us up.
214 if (result == static_cast<int>(state.handles.size()) - 1) {
215 uint32_t num_bytes = 0;
216 MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0,
217 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
218 {
219 base::AutoLock lock(lock_);
220 if (quit_)
221 return;
222 }
223 } else {
224 RemoveAndNotify(state.ids[result], MOJO_RESULT_OK);
225 }
226 } else if (result == MOJO_RESULT_INVALID_ARGUMENT ||
227 result == MOJO_RESULT_FAILED_PRECONDITION) {
228 // One of the handles is invalid or the flags supplied is invalid, remove
229 // it.
230 // Use -1 as last handle is used for communication and should never be
231 // invalid.
232 for (size_t i = 0; i < state.handles.size() - 1; ++i) {
233 const MojoResult result =
234 MojoWait(state.handles[i], state.wait_flags[i], 0);
235 switch (result) {
236 // TODO: do we really want to notify for all these conditions?
237 case MOJO_RESULT_OK:
238 case MOJO_RESULT_FAILED_PRECONDITION:
239 case MOJO_RESULT_INVALID_ARGUMENT:
240 RemoveAndNotify(state.ids[i], result);
241 break;
242 case MOJO_RESULT_DEADLINE_EXCEEDED:
243 break;
244 default:
245 NOTREACHED();
246 }
247 }
248 }
249 }
250 }
251
252 void WatcherThreadManager::SignalBackgroundThread() {
253 // TODO(sky): deal with error?
254 MojoWriteMessage(write_handle(), NULL, 0, NULL, 0,
255 MOJO_WRITE_MESSAGE_FLAG_NONE);
256 }
257
258 void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) {
259 HandleAndCallback to_notify;
260 {
261 base::AutoLock lock(lock_);
262 IDToCallbackMap::iterator i = id_to_callback_.find(id);
263 if (i == id_to_callback_.end())
264 return;
265 to_notify = i->second;
266 id_to_callback_.erase(i);
267 }
268 to_notify.message_loop->PostTask(FROM_HERE,
269 base::Bind(to_notify.callback, result));
270 }
271
272 void WatcherThreadManager::RemoveHandle(MojoHandle handle) {
273 {
274 base::AutoLock lock(lock_);
275 for (IDToCallbackMap::iterator i = id_to_callback_.begin();
276 i != id_to_callback_.end(); ) {
277 if (i->second.handle == handle) {
278 id_to_callback_.erase(i++);
279 } else {
280 ++i;
281 }
282 }
283 }
284 }
285
286 WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() {
287 WaitState state;
288 const base::TimeTicks now(HandleWatcher::NowTicks());
289 base::TimeDelta deadline;
290 {
291 base::AutoLock lock(lock_);
292 for (IDToCallbackMap::const_iterator i = id_to_callback_.begin();
293 i != id_to_callback_.end(); ++i) {
294 if (!i->second.deadline.is_null()) {
295 if (i->second.deadline <= now) {
296 state.deadline_exceeded.insert(i->first);
297 continue;
298 } else {
299 const base::TimeDelta delta = i->second.deadline - now;
300 if (deadline == base::TimeDelta() || delta < deadline)
301 deadline = delta;
302 }
303 }
304 state.ids.push_back(i->first);
305 state.handles.push_back(i->second.handle);
306 state.wait_flags.push_back(i->second.wait_flags);
307 }
308 }
309 state.ids.push_back(0);
310 state.handles.push_back(read_handle());
311 state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE);
312 state.deadline = (deadline == base::TimeDelta()) ?
313 MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds();
314 return state;
315 }
316
317 } // namespace 230 } // namespace
318 231
319 // HandleWatcher::StartState --------------------------------------------------- 232 // HandleWatcher::StartState ---------------------------------------------------
320 233
321 // Contains the information passed to Start(). 234 // Contains the information passed to Start().
322 struct HandleWatcher::StartState { 235 struct HandleWatcher::StartState {
323 explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) { 236 explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
324 } 237 }
325 238
326 ~StartState() { 239 ~StartState() {
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
395 } 308 }
396 309
397 // static 310 // static
398 base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) { 311 base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
399 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : 312 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
400 NowTicks() + base::TimeDelta::FromMicroseconds(deadline); 313 NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
401 } 314 }
402 315
403 } // namespace common 316 } // namespace common
404 } // namespace mojo 317 } // namespace mojo
OLDNEW
« no previous file with comments | « no previous file | mojo/common/message_pump_mojo.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698