OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/connector.h" | 5 #include "mojo/public/cpp/bindings/lib/connector.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
46 | 46 |
47 Connector::Connector(ScopedMessagePipeHandle message_pipe, | 47 Connector::Connector(ScopedMessagePipeHandle message_pipe, |
48 ConnectorConfig config) | 48 ConnectorConfig config) |
49 : message_pipe_(std::move(message_pipe)), | 49 : message_pipe_(std::move(message_pipe)), |
50 incoming_receiver_(nullptr), | 50 incoming_receiver_(nullptr), |
51 error_(false), | 51 error_(false), |
52 drop_writes_(false), | 52 drop_writes_(false), |
53 enforce_errors_from_incoming_receiver_(true), | 53 enforce_errors_from_incoming_receiver_(true), |
54 paused_(false), | 54 paused_(false), |
55 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), | 55 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), |
56 register_sync_handle_watch_count_(0), | 56 allow_woken_up_by_others_(false), |
57 registered_with_sync_handle_watcher_(false), | |
58 sync_handle_watcher_callback_count_(0), | 57 sync_handle_watcher_callback_count_(0), |
59 weak_factory_(this) { | 58 weak_factory_(this) { |
60 weak_self_ = weak_factory_.GetWeakPtr(); | 59 weak_self_ = weak_factory_.GetWeakPtr(); |
61 // Even though we don't have an incoming receiver, we still want to monitor | 60 // Even though we don't have an incoming receiver, we still want to monitor |
62 // the message pipe to know if is closed or encounters an error. | 61 // the message pipe to know if is closed or encounters an error. |
63 WaitToReadMore(); | 62 WaitToReadMore(); |
64 } | 63 } |
65 | 64 |
66 Connector::~Connector() { | 65 Connector::~Connector() { |
67 DCHECK(thread_checker_.CalledOnValidThread()); | 66 DCHECK(thread_checker_.CalledOnValidThread()); |
68 | 67 |
69 CancelWait(); | 68 CancelWait(); |
70 } | 69 } |
71 | 70 |
72 void Connector::CloseMessagePipe() { | 71 void Connector::CloseMessagePipe() { |
73 DCHECK(thread_checker_.CalledOnValidThread()); | 72 DCHECK(thread_checker_.CalledOnValidThread()); |
74 | 73 |
75 CancelWait(); | 74 CancelWait(); |
76 MayAutoLock locker(lock_.get()); | 75 MayAutoLock locker(lock_.get()); |
77 Close(std::move(message_pipe_)); | 76 message_pipe_.reset(); |
78 } | 77 } |
79 | 78 |
80 ScopedMessagePipeHandle Connector::PassMessagePipe() { | 79 ScopedMessagePipeHandle Connector::PassMessagePipe() { |
81 DCHECK(thread_checker_.CalledOnValidThread()); | 80 DCHECK(thread_checker_.CalledOnValidThread()); |
82 | 81 |
83 CancelWait(); | 82 CancelWait(); |
84 MayAutoLock locker(lock_.get()); | 83 MayAutoLock locker(lock_.get()); |
85 return std::move(message_pipe_); | 84 return std::move(message_pipe_); |
86 } | 85 } |
87 | 86 |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
185 CHECK(false) << "Race condition or other bug detected"; | 184 CHECK(false) << "Race condition or other bug detected"; |
186 return false; | 185 return false; |
187 default: | 186 default: |
188 // This particular write was rejected, presumably because of bad input. | 187 // This particular write was rejected, presumably because of bad input. |
189 // The pipe is not necessarily in a bad state. | 188 // The pipe is not necessarily in a bad state. |
190 return false; | 189 return false; |
191 } | 190 } |
192 return true; | 191 return true; |
193 } | 192 } |
194 | 193 |
195 bool Connector::RegisterSyncHandleWatch() { | 194 void Connector::AllowWokenUpBySyncWatchOnSameThread() { |
196 DCHECK(thread_checker_.CalledOnValidThread()); | 195 DCHECK(thread_checker_.CalledOnValidThread()); |
197 | 196 |
198 if (error_) | 197 allow_woken_up_by_others_ = true; |
199 return false; | |
200 | 198 |
201 register_sync_handle_watch_count_++; | 199 EnsureSyncWatcherExists(); |
202 | 200 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
203 if (!registered_with_sync_handle_watcher_ && !paused_) { | |
204 registered_with_sync_handle_watcher_ = | |
205 SyncHandleWatcher::current()->RegisterHandle( | |
206 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | |
207 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | |
208 base::Unretained(this))); | |
209 } | |
210 return true; | |
211 } | 201 } |
212 | 202 |
213 void Connector::UnregisterSyncHandleWatch() { | 203 bool Connector::SyncWatch(const bool* should_stop) { |
214 DCHECK(thread_checker_.CalledOnValidThread()); | 204 DCHECK(thread_checker_.CalledOnValidThread()); |
215 | 205 |
216 if (register_sync_handle_watch_count_ == 0) { | |
217 NOTREACHED(); | |
218 return; | |
219 } | |
220 | |
221 register_sync_handle_watch_count_--; | |
222 if (register_sync_handle_watch_count_ > 0) | |
223 return; | |
224 | |
225 if (registered_with_sync_handle_watcher_) { | |
226 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); | |
227 registered_with_sync_handle_watcher_ = false; | |
228 } | |
229 } | |
230 | |
231 bool Connector::RunSyncHandleWatch(const bool* should_stop) { | |
232 DCHECK(thread_checker_.CalledOnValidThread()); | |
233 DCHECK_GT(register_sync_handle_watch_count_, 0u); | |
234 | |
235 if (error_) | 206 if (error_) |
236 return false; | 207 return false; |
237 | 208 |
238 ResumeIncomingMethodCallProcessing(); | 209 ResumeIncomingMethodCallProcessing(); |
239 | 210 |
240 if (!should_stop_sync_handle_watch_) | 211 EnsureSyncWatcherExists(); |
241 should_stop_sync_handle_watch_ = new base::RefCountedData<bool>(false); | 212 return sync_watcher_->SyncWatch(should_stop); |
242 | |
243 // This object may be destroyed during the WatchAllHandles() call. So we have | |
244 // to preserve the boolean that WatchAllHandles uses. | |
245 scoped_refptr<base::RefCountedData<bool>> preserver = | |
246 should_stop_sync_handle_watch_; | |
247 const bool* should_stop_array[] = {should_stop, | |
248 &should_stop_sync_handle_watch_->data}; | |
249 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); | |
250 } | 213 } |
251 | 214 |
252 void Connector::OnWatcherHandleReady(MojoResult result) { | 215 void Connector::OnWatcherHandleReady(MojoResult result) { |
253 OnHandleReadyInternal(result); | 216 OnHandleReadyInternal(result); |
254 } | 217 } |
255 | 218 |
256 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 219 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
257 base::WeakPtr<Connector> weak_self(weak_self_); | 220 base::WeakPtr<Connector> weak_self(weak_self_); |
258 | 221 |
259 sync_handle_watcher_callback_count_++; | 222 sync_handle_watcher_callback_count_++; |
(...skipping 24 matching lines...) Expand all Loading... |
284 base::Unretained(this))); | 247 base::Unretained(this))); |
285 | 248 |
286 if (rv != MOJO_RESULT_OK) { | 249 if (rv != MOJO_RESULT_OK) { |
287 // If the watch failed because the handle is invalid or its conditions can | 250 // If the watch failed because the handle is invalid or its conditions can |
288 // no longer be met, we signal the error asynchronously to avoid reentry. | 251 // no longer be met, we signal the error asynchronously to avoid reentry. |
289 base::ThreadTaskRunnerHandle::Get()->PostTask( | 252 base::ThreadTaskRunnerHandle::Get()->PostTask( |
290 FROM_HERE, | 253 FROM_HERE, |
291 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 254 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
292 } | 255 } |
293 | 256 |
294 if (register_sync_handle_watch_count_ > 0 && | 257 if (allow_woken_up_by_others_) { |
295 !registered_with_sync_handle_watcher_) { | 258 EnsureSyncWatcherExists(); |
296 registered_with_sync_handle_watcher_ = | 259 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
297 SyncHandleWatcher::current()->RegisterHandle( | |
298 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | |
299 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | |
300 base::Unretained(this))); | |
301 } | 260 } |
302 } | 261 } |
303 | 262 |
304 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 263 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
305 CHECK(!paused_); | 264 CHECK(!paused_); |
306 | 265 |
307 bool receiver_result = false; | 266 bool receiver_result = false; |
308 | 267 |
309 // Detect if |this| was destroyed during message dispatch. Allow for the | 268 // Detect if |this| was destroyed during message dispatch. Allow for the |
310 // possibility of re-entering ReadMore() through message dispatch. | 269 // possibility of re-entering ReadMore() through message dispatch. |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
348 if (paused_) | 307 if (paused_) |
349 return; | 308 return; |
350 | 309 |
351 if (rv == MOJO_RESULT_SHOULD_WAIT) | 310 if (rv == MOJO_RESULT_SHOULD_WAIT) |
352 break; | 311 break; |
353 } | 312 } |
354 } | 313 } |
355 | 314 |
356 void Connector::CancelWait() { | 315 void Connector::CancelWait() { |
357 handle_watcher_.Cancel(); | 316 handle_watcher_.Cancel(); |
358 | 317 sync_watcher_.reset(); |
359 if (registered_with_sync_handle_watcher_) { | |
360 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); | |
361 registered_with_sync_handle_watcher_ = false; | |
362 } | |
363 | |
364 if (should_stop_sync_handle_watch_) | |
365 should_stop_sync_handle_watch_->data = true; | |
366 } | 318 } |
367 | 319 |
368 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { | 320 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
369 if (error_ || !message_pipe_.is_valid()) | 321 if (error_ || !message_pipe_.is_valid()) |
370 return; | 322 return; |
371 | 323 |
372 if (during_sync_handle_watcher_callback() || paused_) { | 324 if (paused_) { |
373 // Enforce calling the error handler asynchronously if: | 325 // Enforce calling the error handler asynchronously if the user has paused |
374 // - currently we are in a sync handle watcher callback. We don't want the | 326 // receiving messages. We need to wait until the user starts receiving |
375 // error handler to reenter an ongoing sync call. | 327 // messages again. |
376 // - the user has paused receiving messages. We need to wait until the user | |
377 // starts receiving messages again. | |
378 force_async_handler = true; | 328 force_async_handler = true; |
379 } | 329 } |
380 | 330 |
381 if (!force_pipe_reset && force_async_handler) | 331 if (!force_pipe_reset && force_async_handler) |
382 force_pipe_reset = true; | 332 force_pipe_reset = true; |
383 | 333 |
384 if (force_pipe_reset) { | 334 if (force_pipe_reset) { |
385 CancelWait(); | 335 CancelWait(); |
386 MayAutoLock locker(lock_.get()); | 336 MayAutoLock locker(lock_.get()); |
387 Close(std::move(message_pipe_)); | 337 message_pipe_.reset(); |
388 MessagePipe dummy_pipe; | 338 MessagePipe dummy_pipe; |
389 message_pipe_ = std::move(dummy_pipe.handle0); | 339 message_pipe_ = std::move(dummy_pipe.handle0); |
390 } else { | 340 } else { |
391 CancelWait(); | 341 CancelWait(); |
392 } | 342 } |
393 | 343 |
394 if (force_async_handler) { | 344 if (force_async_handler) { |
395 if (!paused_) | 345 if (!paused_) |
396 WaitToReadMore(); | 346 WaitToReadMore(); |
397 } else { | 347 } else { |
398 error_ = true; | 348 error_ = true; |
399 connection_error_handler_.Run(); | 349 connection_error_handler_.Run(); |
400 } | 350 } |
401 } | 351 } |
402 | 352 |
| 353 void Connector::EnsureSyncWatcherExists() { |
| 354 if (sync_watcher_) |
| 355 return; |
| 356 sync_watcher_.reset(new SyncHandleWatcher( |
| 357 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 358 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
| 359 base::Unretained(this)))); |
| 360 } |
| 361 |
403 } // namespace internal | 362 } // namespace internal |
404 } // namespace mojo | 363 } // namespace mojo |
OLD | NEW |