Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(44)

Side by Side Diff: mojo/common/handle_watcher.cc

Issue 480293004: Performance tuning of HandleWatcher (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: better comment Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/common/handle_watcher.h" 5 #include "mojo/common/handle_watcher.h"
6 6
7 #include <map> 7 #include <map>
8 8
9 #include "base/atomic_sequence_num.h" 9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
64 // WatcherBackend is responsible for managing the requests and interacting with 64 // WatcherBackend is responsible for managing the requests and interacting with
65 // MessagePumpMojo. All access (outside of creation/destruction) is done on the 65 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
66 // thread WatcherThreadManager creates. 66 // thread WatcherThreadManager creates.
67 class WatcherBackend : public MessagePumpMojoHandler { 67 class WatcherBackend : public MessagePumpMojoHandler {
68 public: 68 public:
69 WatcherBackend(); 69 WatcherBackend();
70 virtual ~WatcherBackend(); 70 virtual ~WatcherBackend();
71 71
72 void StartWatching(const WatchData& data); 72 void StartWatching(const WatchData& data);
73 73
74 // Cancels a previously schedule request to start a watch. When done signals 74 // Cancels a previously scheduled request to start a watch.
75 // |event|. 75 void StopWatching(WatcherID watcher_id);
76 void StopWatching(WatcherID watcher_id, base::WaitableEvent* event);
77 76
78 private: 77 private:
79 typedef std::map<Handle, WatchData> HandleToWatchDataMap; 78 typedef std::map<Handle, WatchData> HandleToWatchDataMap;
80 79
81 // Invoked when a handle needs to be removed and notified. 80 // Invoked when a handle needs to be removed and notified.
82 void RemoveAndNotify(const Handle& handle, MojoResult result); 81 void RemoveAndNotify(const Handle& handle, MojoResult result);
83 82
84 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found 83 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
85 // and sets |handle| to the Handle. Returns false if not a known id. 84 // and sets |handle| to the Handle. Returns false if not a known id.
86 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; 85 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
(...skipping 18 matching lines...) Expand all
105 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); 104 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
106 105
107 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); 106 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
108 107
109 handle_to_data_[data.handle] = data; 108 handle_to_data_[data.handle] = data;
110 message_pump_mojo->AddHandler(this, data.handle, 109 message_pump_mojo->AddHandler(this, data.handle,
111 data.handle_signals, 110 data.handle_signals,
112 data.deadline); 111 data.deadline);
113 } 112 }
114 113
115 void WatcherBackend::StopWatching(WatcherID watcher_id, 114 void WatcherBackend::StopWatching(WatcherID watcher_id) {
116 base::WaitableEvent* event) {
117 // Because of the thread hop it is entirely possible to get here and not 115 // Because of the thread hop it is entirely possible to get here and not
118 // have a valid handle registered for |watcher_id|. 116 // have a valid handle registered for |watcher_id|.
119 Handle handle; 117 Handle handle;
120 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { 118 if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
121 handle_to_data_.erase(handle); 119 handle_to_data_.erase(handle);
122 message_pump_mojo->RemoveHandler(handle); 120 message_pump_mojo->RemoveHandler(handle);
123 } 121 }
124 event->Signal();
125 } 122 }
126 123
127 void WatcherBackend::RemoveAndNotify(const Handle& handle, 124 void WatcherBackend::RemoveAndNotify(const Handle& handle,
128 MojoResult result) { 125 MojoResult result) {
129 if (handle_to_data_.count(handle) == 0) 126 if (handle_to_data_.count(handle) == 0)
130 return; 127 return;
131 128
132 const WatchData data(handle_to_data_[handle]); 129 const WatchData data(handle_to_data_[handle]);
133 handle_to_data_.erase(handle); 130 handle_to_data_.erase(handle);
134 message_pump_mojo->RemoveHandler(handle); 131 message_pump_mojo->RemoveHandler(handle);
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
175 WatcherID StartWatching(const Handle& handle, 172 WatcherID StartWatching(const Handle& handle,
176 MojoHandleSignals handle_signals, 173 MojoHandleSignals handle_signals,
177 base::TimeTicks deadline, 174 base::TimeTicks deadline,
178 const base::Callback<void(MojoResult)>& callback); 175 const base::Callback<void(MojoResult)>& callback);
179 176
180 // Stops watching a handle. 177 // Stops watching a handle.
181 // This may be invoked on any thread. 178 // This may be invoked on any thread.
182 void StopWatching(WatcherID watcher_id); 179 void StopWatching(WatcherID watcher_id);
183 180
184 private: 181 private:
182 enum RequestType {
183 REQUEST_START,
184 REQUEST_STOP,
185 };
186
187 // See description of |requests_| for details.
188 struct RequestData {
189 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
190
191 RequestType type;
192 WatchData start_data;
193 WatcherID stop_id;
194 base::WaitableEvent* stop_event;
195 };
196
197 typedef std::vector<RequestData> Requests;
198
185 friend struct DefaultSingletonTraits<WatcherThreadManager>; 199 friend struct DefaultSingletonTraits<WatcherThreadManager>;
200
186 WatcherThreadManager(); 201 WatcherThreadManager();
187 202
203 // Schedules a request on the background thread. See |requests_| for details.
204 void AddRequest(const RequestData& data);
205
206 // Processes requests added to |requests_|. This is invoked on the backend
207 // thread.
208 void ProcessRequestsOnBackendThread();
209
188 base::Thread thread_; 210 base::Thread thread_;
189 211
190 base::AtomicSequenceNumber watcher_id_generator_; 212 base::AtomicSequenceNumber watcher_id_generator_;
191 213
192 WatcherBackend backend_; 214 WatcherBackend backend_;
193 215
216 // Protects |requests_|.
217 base::Lock lock_;
218
219 // Start/Stop result in adding a RequestData to |requests_| (protected by
220 // |lock_|). When the background thread wakes up it processes the requests.
221 Requests requests_;
222
194 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); 223 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
195 }; 224 };
196 225
197 WatcherThreadManager::~WatcherThreadManager() { 226 WatcherThreadManager::~WatcherThreadManager() {
198 thread_.Stop(); 227 thread_.Stop();
199 } 228 }
200 229
201 WatcherThreadManager* WatcherThreadManager::GetInstance() { 230 WatcherThreadManager* WatcherThreadManager::GetInstance() {
202 return Singleton<WatcherThreadManager>::get(); 231 return Singleton<WatcherThreadManager>::get();
203 } 232 }
204 233
205 WatcherID WatcherThreadManager::StartWatching( 234 WatcherID WatcherThreadManager::StartWatching(
206 const Handle& handle, 235 const Handle& handle,
207 MojoHandleSignals handle_signals, 236 MojoHandleSignals handle_signals,
208 base::TimeTicks deadline, 237 base::TimeTicks deadline,
209 const base::Callback<void(MojoResult)>& callback) { 238 const base::Callback<void(MojoResult)>& callback) {
210 WatchData data; 239 RequestData request_data;
211 data.id = watcher_id_generator_.GetNext(); 240 request_data.type = REQUEST_START;
212 data.handle = handle; 241 request_data.start_data.id = watcher_id_generator_.GetNext();
213 data.callback = callback; 242 request_data.start_data.handle = handle;
214 data.handle_signals = handle_signals; 243 request_data.start_data.callback = callback;
215 data.deadline = deadline; 244 request_data.start_data.handle_signals = handle_signals;
216 data.message_loop = base::MessageLoopProxy::current(); 245 request_data.start_data.deadline = deadline;
246 request_data.start_data.message_loop = base::MessageLoopProxy::current();
217 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), 247 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
218 data.message_loop.get()); 248 request_data.start_data.message_loop.get());
219 // We own |thread_|, so it's safe to use Unretained() here. 249 AddRequest(request_data);
220 thread_.message_loop()->PostTask( 250 return request_data.start_data.id;
221 FROM_HERE,
222 base::Bind(&WatcherBackend::StartWatching,
223 base::Unretained(&backend_),
224 data));
225 return data.id;
226 } 251 }
227 252
228 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { 253 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
254 // Handle the case of StartWatching() followed by StopWatching() before
255 // |thread_| woke up.
256 {
257 base::AutoLock auto_lock(lock_);
258 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
259 if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
260 // Watcher ids are not reused, so if we find it we can stop.
261 requests_.erase(i);
262 return;
263 }
264 }
265 }
266
229 base::ThreadRestrictions::ScopedAllowWait allow_wait; 267 base::ThreadRestrictions::ScopedAllowWait allow_wait;
230 base::WaitableEvent event(true, false); 268 base::WaitableEvent event(true, false);
231 // We own |thread_|, so it's safe to use Unretained() here. 269 RequestData request_data;
232 thread_.message_loop()->PostTask( 270 request_data.type = REQUEST_STOP;
233 FROM_HERE, 271 request_data.stop_id = watcher_id;
234 base::Bind(&WatcherBackend::StopWatching, 272 request_data.stop_event = &event;
235 base::Unretained(&backend_), 273 AddRequest(request_data);
236 watcher_id,
237 &event));
238 274
239 // We need to block until the handle is actually removed. 275 // We need to block until the handle is actually removed.
240 event.Wait(); 276 event.Wait();
241 } 277 }
242 278
279 void WatcherThreadManager::AddRequest(const RequestData& data) {
280 {
281 base::AutoLock auto_lock(lock_);
282 const bool was_empty = requests_.empty();
283 requests_.push_back(data);
284 if (!was_empty)
285 return;
286 }
287 // We own |thread_|, so it's safe to use Unretained() here.
288 thread_.message_loop()->PostTask(
289 FROM_HERE,
290 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
291 base::Unretained(this)));
292 }
293
294 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
295 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
296
297 Requests requests;
298 {
299 base::AutoLock auto_lock(lock_);
300 requests_.swap(requests);
301 }
302 for (size_t i = 0; i < requests.size(); ++i) {
303 if (requests[i].type == REQUEST_START) {
304 backend_.StartWatching(requests[i].start_data);
305 } else {
306 backend_.StopWatching(requests[i].stop_id);
307 requests[i].stop_event->Signal();
308 }
309 }
310 }
311
243 WatcherThreadManager::WatcherThreadManager() 312 WatcherThreadManager::WatcherThreadManager()
244 : thread_(kWatcherThreadName) { 313 : thread_(kWatcherThreadName) {
245 base::Thread::Options thread_options; 314 base::Thread::Options thread_options;
246 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); 315 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
247 thread_.StartWithOptions(thread_options); 316 thread_.StartWithOptions(thread_options);
248 } 317 }
249 318
250 // HandleWatcher::State -------------------------------------------------------- 319 // HandleWatcher::State --------------------------------------------------------
251 320
252 // Represents the state of the HandleWatcher. Owns the user's callback and 321 // Represents the state of the HandleWatcher. Owns the user's callback and
253 // monitors the current thread's MessageLoop to know when to force the callback 322 // monitors the current thread's MessageLoop to know when to force the callback
254 // to run (with an error) even though the pipe hasn't been signaled yet. 323 // to run (with an error) even though the pipe hasn't been signaled yet.
255 class HandleWatcher::State : public base::MessageLoop::DestructionObserver { 324 class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
256 public: 325 public:
257 State(HandleWatcher* watcher, 326 State(HandleWatcher* watcher,
258 const Handle& handle, 327 const Handle& handle,
259 MojoHandleSignals handle_signals, 328 MojoHandleSignals handle_signals,
260 MojoDeadline deadline, 329 MojoDeadline deadline,
261 const base::Callback<void(MojoResult)>& callback) 330 const base::Callback<void(MojoResult)>& callback)
262 : watcher_(watcher), 331 : watcher_(watcher),
263 callback_(callback), 332 callback_(callback),
333 got_ready_(false),
264 weak_factory_(this) { 334 weak_factory_(this) {
265 base::MessageLoop::current()->AddDestructionObserver(this); 335 base::MessageLoop::current()->AddDestructionObserver(this);
266 336
267 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( 337 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
268 handle, 338 handle,
269 handle_signals, 339 handle_signals,
270 MojoDeadlineToTimeTicks(deadline), 340 MojoDeadlineToTimeTicks(deadline),
271 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr())); 341 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
272 } 342 }
273 343
274 virtual ~State() { 344 virtual ~State() {
275 base::MessageLoop::current()->RemoveDestructionObserver(this); 345 base::MessageLoop::current()->RemoveDestructionObserver(this);
276 346
277 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); 347 // If we've been notified the handle is ready (|got_ready_| is true) then
348 // the watch has been implicitly removed by
349 // WatcherThreadManager/MessagePumpMojo and we don't have to call
350 // StopWatching(). To do so would needlessly entail posting a task and
351 // blocking until the background thread services it.
352 if (!got_ready_)
353 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
278 } 354 }
279 355
280 private: 356 private:
281 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { 357 virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
282 // The current thread is exiting. Simulate a watch error. 358 // The current thread is exiting. Simulate a watch error.
283 OnHandleReady(MOJO_RESULT_ABORTED); 359 NotifyAndDestroy(MOJO_RESULT_ABORTED);
284 } 360 }
285 361
286 void OnHandleReady(MojoResult result) { 362 void OnHandleReady(MojoResult result) {
363 got_ready_ = true;
364 NotifyAndDestroy(result);
365 }
366
367 void NotifyAndDestroy(MojoResult result) {
287 base::Callback<void(MojoResult)> callback = callback_; 368 base::Callback<void(MojoResult)> callback = callback_;
288 watcher_->Stop(); // Destroys |this|. 369 watcher_->Stop(); // Destroys |this|.
289 370
290 callback.Run(result); 371 callback.Run(result);
291 } 372 }
292 373
293 HandleWatcher* watcher_; 374 HandleWatcher* watcher_;
294 WatcherID watcher_id_; 375 WatcherID watcher_id_;
295 base::Callback<void(MojoResult)> callback_; 376 base::Callback<void(MojoResult)> callback_;
296 377
378 // Have we been notified that the handle is ready?
379 bool got_ready_;
380
297 // Used to weakly bind |this| to the WatcherThreadManager. 381 // Used to weakly bind |this| to the WatcherThreadManager.
298 base::WeakPtrFactory<State> weak_factory_; 382 base::WeakPtrFactory<State> weak_factory_;
299 }; 383 };
300 384
301 // HandleWatcher --------------------------------------------------------------- 385 // HandleWatcher ---------------------------------------------------------------
302 386
303 HandleWatcher::HandleWatcher() { 387 HandleWatcher::HandleWatcher() {
304 } 388 }
305 389
306 HandleWatcher::~HandleWatcher() { 390 HandleWatcher::~HandleWatcher() {
307 } 391 }
308 392
309 void HandleWatcher::Start(const Handle& handle, 393 void HandleWatcher::Start(const Handle& handle,
310 MojoHandleSignals handle_signals, 394 MojoHandleSignals handle_signals,
311 MojoDeadline deadline, 395 MojoDeadline deadline,
312 const base::Callback<void(MojoResult)>& callback) { 396 const base::Callback<void(MojoResult)>& callback) {
313 DCHECK(handle.is_valid()); 397 DCHECK(handle.is_valid());
314 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); 398 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
315 399
316 state_.reset(new State(this, handle, handle_signals, deadline, callback)); 400 state_.reset(new State(this, handle, handle_signals, deadline, callback));
317 } 401 }
318 402
319 void HandleWatcher::Stop() { 403 void HandleWatcher::Stop() {
320 state_.reset(); 404 state_.reset();
321 } 405 }
322 406
323 } // namespace common 407 } // namespace common
324 } // namespace mojo 408 } // namespace mojo
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698