Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(12)

Side by Side Diff: mojo/message_pump/handle_watcher.cc

Issue 1475983004: Revert making HandleWatcher block until no longer waiting on pipe (r285266). (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: update comment (undo change in r283888 Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/message_pump/handle_watcher.h ('k') | mojo/message_pump/message_pump_mojo.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/message_pump/handle_watcher.h" 5 #include "mojo/message_pump/handle_watcher.h"
6 6
7 #include <map> 7 #include <map>
8 8
9 #include "base/atomic_sequence_num.h" 9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/lazy_instance.h" 11 #include "base/lazy_instance.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/memory/singleton.h" 14 #include "base/memory/singleton.h"
15 #include "base/memory/weak_ptr.h" 15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h" 16 #include "base/message_loop/message_loop.h"
17 #include "base/profiler/scoped_tracker.h" 17 #include "base/profiler/scoped_tracker.h"
18 #include "base/single_thread_task_runner.h" 18 #include "base/single_thread_task_runner.h"
19 #include "base/synchronization/lock.h" 19 #include "base/synchronization/lock.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/thread_task_runner_handle.h" 20 #include "base/thread_task_runner_handle.h"
22 #include "base/threading/thread.h" 21 #include "base/threading/thread.h"
23 #include "base/threading/thread_restrictions.h"
24 #include "base/time/time.h" 22 #include "base/time/time.h"
25 #include "mojo/message_pump/message_pump_mojo.h" 23 #include "mojo/message_pump/message_pump_mojo.h"
26 #include "mojo/message_pump/message_pump_mojo_handler.h" 24 #include "mojo/message_pump/message_pump_mojo_handler.h"
27 #include "mojo/message_pump/time_helper.h" 25 #include "mojo/message_pump/time_helper.h"
28 26
29 namespace mojo { 27 namespace mojo {
30 namespace common { 28 namespace common {
31 29
32 typedef int WatcherID; 30 typedef int WatcherID;
33 31
(...skipping 23 matching lines...) Expand all
57 55
58 // WatcherBackend is responsible for managing the requests and interacting with 56 // WatcherBackend is responsible for managing the requests and interacting with
59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the 57 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
60 // thread WatcherThreadManager creates. 58 // thread WatcherThreadManager creates.
61 class WatcherBackend : public MessagePumpMojoHandler { 59 class WatcherBackend : public MessagePumpMojoHandler {
62 public: 60 public:
63 WatcherBackend(); 61 WatcherBackend();
64 ~WatcherBackend() override; 62 ~WatcherBackend() override;
65 63
66 void StartWatching(const WatchData& data); 64 void StartWatching(const WatchData& data);
67
68 // Cancels a previously scheduled request to start a watch.
69 void StopWatching(WatcherID watcher_id); 65 void StopWatching(WatcherID watcher_id);
70 66
71 private: 67 private:
72 typedef std::map<Handle, WatchData> HandleToWatchDataMap; 68 typedef std::map<Handle, WatchData> HandleToWatchDataMap;
73 69
74 // Invoked when a handle needs to be removed and notified. 70 // Invoked when a handle needs to be removed and notified.
75 void RemoveAndNotify(const Handle& handle, MojoResult result); 71 void RemoveAndNotify(const Handle& handle, MojoResult result);
76 72
77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found 73 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
78 // and sets |handle| to the Handle. Returns false if not a known id. 74 // and sets |handle| to the Handle. Returns false if not a known id.
(...skipping 23 matching lines...) Expand all
102 handle_to_data_[data.handle] = data; 98 handle_to_data_[data.handle] = data;
103 MessagePumpMojo::current()->AddHandler(this, data.handle, 99 MessagePumpMojo::current()->AddHandler(this, data.handle,
104 data.handle_signals, 100 data.handle_signals,
105 data.deadline); 101 data.deadline);
106 } 102 }
107 103
108 void WatcherBackend::StopWatching(WatcherID watcher_id) { 104 void WatcherBackend::StopWatching(WatcherID watcher_id) {
109 // Because of the thread hop it is entirely possible to get here and not 105 // Because of the thread hop it is entirely possible to get here and not
110 // have a valid handle registered for |watcher_id|. 106 // have a valid handle registered for |watcher_id|.
111 Handle handle; 107 Handle handle;
112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { 108 if (!GetMojoHandleByWatcherID(watcher_id, &handle))
113 handle_to_data_.erase(handle); 109 return;
114 MessagePumpMojo::current()->RemoveHandler(handle); 110
115 } 111 handle_to_data_.erase(handle);
112 MessagePumpMojo::current()->RemoveHandler(handle);
116 } 113 }
117 114
118 void WatcherBackend::RemoveAndNotify(const Handle& handle, 115 void WatcherBackend::RemoveAndNotify(const Handle& handle,
119 MojoResult result) { 116 MojoResult result) {
120 if (handle_to_data_.count(handle) == 0) 117 if (handle_to_data_.count(handle) == 0)
121 return; 118 return;
122 119
123 const WatchData data(handle_to_data_[handle]); 120 const WatchData data(handle_to_data_[handle]);
124 handle_to_data_.erase(handle); 121 handle_to_data_.erase(handle);
125 MessagePumpMojo::current()->RemoveHandler(handle); 122 MessagePumpMojo::current()->RemoveHandler(handle);
(...skipping 18 matching lines...) Expand all
144 } 141 }
145 142
146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { 143 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
147 RemoveAndNotify(handle, result); 144 RemoveAndNotify(handle, result);
148 } 145 }
149 146
150 // WatcherThreadManager -------------------------------------------------------- 147 // WatcherThreadManager --------------------------------------------------------
151 148
152 // WatcherThreadManager manages the background thread that listens for handles 149 // WatcherThreadManager manages the background thread that listens for handles
153 // to be ready. All requests are handled by WatcherBackend. 150 // to be ready. All requests are handled by WatcherBackend.
154 } // namespace
155
156 class WatcherThreadManager { 151 class WatcherThreadManager {
157 public: 152 public:
158 ~WatcherThreadManager(); 153 ~WatcherThreadManager();
159 154
160 // Returns the shared instance. 155 // Returns the shared instance.
161 static WatcherThreadManager* GetInstance(); 156 static WatcherThreadManager* GetInstance();
162 157
163 // Starts watching the requested handle. Returns a unique ID that is used to 158 // Starts watching the requested handle. Returns a unique ID that is used to
164 // stop watching the handle. When the handle is ready |callback| is notified 159 // stop watching the handle. When the handle is ready |callback| is notified
165 // on the thread StartWatching() was invoked on. 160 // on the thread StartWatching() was invoked on.
166 // This may be invoked on any thread. 161 // This may be invoked on any thread.
167 WatcherID StartWatching(const Handle& handle, 162 WatcherID StartWatching(const Handle& handle,
168 MojoHandleSignals handle_signals, 163 MojoHandleSignals handle_signals,
169 base::TimeTicks deadline, 164 base::TimeTicks deadline,
170 const base::Callback<void(MojoResult)>& callback); 165 const base::Callback<void(MojoResult)>& callback);
171 166
172 // Stops watching a handle. 167 // Stops watching a handle.
173 // This may be invoked on any thread. 168 // This may be invoked on any thread.
174 void StopWatching(WatcherID watcher_id); 169 void StopWatching(WatcherID watcher_id);
175 170
176 private: 171 private:
177 enum RequestType { 172 enum RequestType {
178 REQUEST_START, 173 REQUEST_START,
179 REQUEST_STOP, 174 REQUEST_STOP,
180 }; 175 };
181 176
182 // See description of |requests_| for details. 177 // See description of |requests_| for details.
183 struct RequestData { 178 struct RequestData {
184 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} 179 RequestData() : type(REQUEST_START), stop_id(0) {}
185 180
186 RequestType type; 181 RequestType type;
187 WatchData start_data; 182 WatchData start_data;
188 WatcherID stop_id; 183 WatcherID stop_id;
189 base::WaitableEvent* stop_event;
190 }; 184 };
191 185
192 typedef std::vector<RequestData> Requests; 186 typedef std::vector<RequestData> Requests;
193 187
194 friend struct base::DefaultSingletonTraits<WatcherThreadManager>; 188 friend struct base::DefaultSingletonTraits<WatcherThreadManager>;
195 189
196 WatcherThreadManager(); 190 WatcherThreadManager();
197 191
198 // Schedules a request on the background thread. See |requests_| for details. 192 // Schedules a request on the background thread. See |requests_| for details.
199 void AddRequest(const RequestData& data); 193 void AddRequest(const RequestData& data);
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
255 return; 249 return;
256 } 250 }
257 } 251 }
258 } 252 }
259 253
260 // TODO(amistry): Remove ScopedTracker below once http://crbug.com/554761 is 254 // TODO(amistry): Remove ScopedTracker below once http://crbug.com/554761 is
261 // fixed. 255 // fixed.
262 tracked_objects::ScopedTracker tracking_profile( 256 tracked_objects::ScopedTracker tracking_profile(
263 FROM_HERE_WITH_EXPLICIT_FUNCTION( 257 FROM_HERE_WITH_EXPLICIT_FUNCTION(
264 "554761 WatcherThreadManager::StopWatching")); 258 "554761 WatcherThreadManager::StopWatching"));
265 base::ThreadRestrictions::ScopedAllowWait allow_wait;
266 base::WaitableEvent event(true, false);
267 RequestData request_data; 259 RequestData request_data;
268 request_data.type = REQUEST_STOP; 260 request_data.type = REQUEST_STOP;
269 request_data.stop_id = watcher_id; 261 request_data.stop_id = watcher_id;
270 request_data.stop_event = &event;
271 AddRequest(request_data); 262 AddRequest(request_data);
272
273 // We need to block until the handle is actually removed.
274 event.Wait();
275 } 263 }
276 264
277 void WatcherThreadManager::AddRequest(const RequestData& data) { 265 void WatcherThreadManager::AddRequest(const RequestData& data) {
278 { 266 {
279 base::AutoLock auto_lock(lock_); 267 base::AutoLock auto_lock(lock_);
280 const bool was_empty = requests_.empty(); 268 const bool was_empty = requests_.empty();
281 requests_.push_back(data); 269 requests_.push_back(data);
282 if (!was_empty) 270 if (!was_empty)
283 return; 271 return;
284 } 272 }
285 // We own |thread_|, so it's safe to use Unretained() here. 273 // We outlive |thread_|, so it's safe to use Unretained() here.
286 thread_.task_runner()->PostTask( 274 thread_.task_runner()->PostTask(
287 FROM_HERE, 275 FROM_HERE,
288 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, 276 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
289 base::Unretained(this))); 277 base::Unretained(this)));
290 } 278 }
291 279
292 void WatcherThreadManager::ProcessRequestsOnBackendThread() { 280 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
293 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); 281 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
294 282
295 Requests requests; 283 Requests requests;
296 { 284 {
297 base::AutoLock auto_lock(lock_); 285 base::AutoLock auto_lock(lock_);
298 requests_.swap(requests); 286 requests_.swap(requests);
299 } 287 }
300 for (size_t i = 0; i < requests.size(); ++i) { 288 for (size_t i = 0; i < requests.size(); ++i) {
301 if (requests[i].type == REQUEST_START) { 289 if (requests[i].type == REQUEST_START) {
302 backend_.StartWatching(requests[i].start_data); 290 backend_.StartWatching(requests[i].start_data);
303 } else { 291 } else {
304 backend_.StopWatching(requests[i].stop_id); 292 backend_.StopWatching(requests[i].stop_id);
305 requests[i].stop_event->Signal();
306 } 293 }
307 } 294 }
308 } 295 }
309 296
310 WatcherThreadManager::WatcherThreadManager() 297 WatcherThreadManager::WatcherThreadManager()
311 : thread_(kWatcherThreadName) { 298 : thread_(kWatcherThreadName) {
312 base::Thread::Options thread_options; 299 base::Thread::Options thread_options;
313 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); 300 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
314 thread_.StartWithOptions(thread_options); 301 thread_.StartWithOptions(thread_options);
315 } 302 }
316 303
304 } // namespace
305
317 // HandleWatcher::StateBase and subclasses ------------------------------------- 306 // HandleWatcher::StateBase and subclasses -------------------------------------
318 307
319 // The base class of HandleWatcher's state. Owns the user's callback and 308 // The base class of HandleWatcher's state. Owns the user's callback and
320 // monitors the current thread's MessageLoop to know when to force the callback 309 // monitors the current thread's MessageLoop to know when to force the callback
321 // to run (with an error) even though the pipe hasn't been signaled yet. 310 // to run (with an error) even though the pipe hasn't been signaled yet.
322 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { 311 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
323 public: 312 public:
324 StateBase(HandleWatcher* watcher, 313 StateBase(HandleWatcher* watcher,
325 const base::Callback<void(MojoResult)>& callback) 314 const base::Callback<void(MojoResult)>& callback)
326 : watcher_(watcher), 315 : watcher_(watcher),
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
472 this, handle, handle_signals, deadline, callback)); 461 this, handle, handle_signals, deadline, callback));
473 } 462 }
474 } 463 }
475 464
476 void HandleWatcher::Stop() { 465 void HandleWatcher::Stop() {
477 state_.reset(); 466 state_.reset();
478 } 467 }
479 468
480 } // namespace common 469 } // namespace common
481 } // namespace mojo 470 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/message_pump/handle_watcher.h ('k') | mojo/message_pump/message_pump_mojo.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698