OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/connector.h" | 5 #include "mojo/public/cpp/bindings/lib/connector.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
11 #include "base/logging.h" | 11 #include "base/logging.h" |
12 #include "base/macros.h" | 12 #include "base/macros.h" |
13 #include "base/synchronization/lock.h" | 13 #include "base/synchronization/lock.h" |
14 #include "base/thread_task_runner_handle.h" | |
15 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" | 14 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" |
16 | 15 |
17 namespace mojo { | 16 namespace mojo { |
18 namespace internal { | 17 namespace internal { |
19 | 18 |
20 namespace { | 19 namespace { |
21 | 20 |
22 // Similar to base::AutoLock, except that it does nothing if |lock| passed into | 21 // Similar to base::AutoLock, except that it does nothing if |lock| passed into |
23 // the constructor is null. | 22 // the constructor is null. |
24 class MayAutoLock { | 23 class MayAutoLock { |
(...skipping 216 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
241 | 240 |
242 // This object may be destroyed during the WatchAllHandles() call. So we have | 241 // This object may be destroyed during the WatchAllHandles() call. So we have |
243 // to preserve the boolean that WatchAllHandles uses. | 242 // to preserve the boolean that WatchAllHandles uses. |
244 scoped_refptr<base::RefCountedData<bool>> preserver = | 243 scoped_refptr<base::RefCountedData<bool>> preserver = |
245 should_stop_sync_handle_watch_; | 244 should_stop_sync_handle_watch_; |
246 const bool* should_stop_array[] = {should_stop, | 245 const bool* should_stop_array[] = {should_stop, |
247 &should_stop_sync_handle_watch_->data}; | 246 &should_stop_sync_handle_watch_->data}; |
248 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); | 247 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); |
249 } | 248 } |
250 | 249 |
251 void Connector::OnWatcherHandleReady(MojoResult result) { | 250 void Connector::OnHandleWatcherHandleReady(MojoResult result) { |
252 OnHandleReadyInternal(result); | 251 OnHandleReadyInternal(result); |
253 } | 252 } |
254 | 253 |
255 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 254 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
256 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); | 255 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); |
257 | 256 |
258 sync_handle_watcher_callback_count_++; | 257 sync_handle_watcher_callback_count_++; |
259 OnHandleReadyInternal(result); | 258 OnHandleReadyInternal(result); |
260 // At this point, this object might have been deleted. | 259 // At this point, this object might have been deleted. |
261 if (weak_self) | 260 if (weak_self) |
262 sync_handle_watcher_callback_count_--; | 261 sync_handle_watcher_callback_count_--; |
263 } | 262 } |
264 | 263 |
265 void Connector::OnHandleReadyInternal(MojoResult result) { | 264 void Connector::OnHandleReadyInternal(MojoResult result) { |
266 DCHECK(thread_checker_.CalledOnValidThread()); | 265 DCHECK(thread_checker_.CalledOnValidThread()); |
267 | 266 |
268 if (result != MOJO_RESULT_OK) { | 267 if (result != MOJO_RESULT_OK) { |
269 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 268 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
270 return; | 269 return; |
271 } | 270 } |
272 ReadAllAvailableMessages(); | 271 ReadAllAvailableMessages(); |
273 // At this point, this object might have been deleted. Return. | 272 // At this point, this object might have been deleted. Return. |
274 } | 273 } |
275 | 274 |
276 void Connector::WaitToReadMore() { | 275 void Connector::WaitToReadMore() { |
| 276 CHECK(!handle_watcher_.is_watching()); |
277 CHECK(!paused_); | 277 CHECK(!paused_); |
278 DCHECK(!handle_watcher_.IsWatching()); | 278 handle_watcher_.Start(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
279 | 279 MOJO_DEADLINE_INDEFINITE, |
280 MojoResult rv = handle_watcher_.Start( | 280 base::Bind(&Connector::OnHandleWatcherHandleReady, |
281 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 281 base::Unretained(this))); |
282 base::Bind(&Connector::OnWatcherHandleReady, | |
283 base::Unretained(this))); | |
284 | |
285 if (rv != MOJO_RESULT_OK) { | |
286 // If the watch failed because the handle is invalid or its conditions can | |
287 // no longer be met, we signal the error asynchronously to avoid reentry. | |
288 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
289 FROM_HERE, base::Bind(&Connector::OnWatcherHandleReady, | |
290 weak_factory_.GetWeakPtr(), rv)); | |
291 } | |
292 | 282 |
293 if (register_sync_handle_watch_count_ > 0 && | 283 if (register_sync_handle_watch_count_ > 0 && |
294 !registered_with_sync_handle_watcher_) { | 284 !registered_with_sync_handle_watcher_) { |
295 registered_with_sync_handle_watcher_ = | 285 registered_with_sync_handle_watcher_ = |
296 SyncHandleWatcher::current()->RegisterHandle( | 286 SyncHandleWatcher::current()->RegisterHandle( |
297 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 287 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
298 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | 288 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
299 base::Unretained(this))); | 289 base::Unretained(this))); |
300 } | 290 } |
301 } | 291 } |
302 | 292 |
303 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 293 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
304 CHECK(!paused_); | 294 CHECK(!paused_); |
305 | 295 |
306 bool receiver_result = false; | 296 bool receiver_result = false; |
307 | 297 |
308 // Detect if |this| was destroyed during message dispatch. Allow for the | 298 // Detect if |this| was destroyed during message dispatch. Allow for the |
309 // possibility of re-entering ReadMore() through message dispatch. | 299 // possibility of re-entering ReadMore() through message dispatch. |
310 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); | 300 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); |
311 | 301 |
312 Message message; | 302 Message message; |
313 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 303 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
314 *read_result = rv; | 304 *read_result = rv; |
315 | 305 |
316 if (rv == MOJO_RESULT_OK) { | 306 if (rv == MOJO_RESULT_OK) { |
| 307 // Dispatching the message may spin in a nested message loop. To ensure we |
| 308 // continue dispatching messages when this happens start listening for |
| 309 // messagse now. |
| 310 if (!handle_watcher_.is_watching()) { |
| 311 // TODO: Need to evaluate the perf impact of this. |
| 312 WaitToReadMore(); |
| 313 } |
317 receiver_result = | 314 receiver_result = |
318 incoming_receiver_ && incoming_receiver_->Accept(&message); | 315 incoming_receiver_ && incoming_receiver_->Accept(&message); |
319 } | 316 } |
320 | 317 |
321 if (!weak_self) | 318 if (!weak_self) |
322 return false; | 319 return false; |
323 | 320 |
324 if (rv == MOJO_RESULT_SHOULD_WAIT) | 321 if (rv == MOJO_RESULT_SHOULD_WAIT) |
325 return true; | 322 return true; |
326 | 323 |
(...skipping 13 matching lines...) Expand all Loading... |
340 while (!error_) { | 337 while (!error_) { |
341 MojoResult rv; | 338 MojoResult rv; |
342 | 339 |
343 // Return immediately if |this| was destroyed. Do not touch any members! | 340 // Return immediately if |this| was destroyed. Do not touch any members! |
344 if (!ReadSingleMessage(&rv)) | 341 if (!ReadSingleMessage(&rv)) |
345 return; | 342 return; |
346 | 343 |
347 if (paused_) | 344 if (paused_) |
348 return; | 345 return; |
349 | 346 |
350 if (rv == MOJO_RESULT_SHOULD_WAIT) | 347 if (rv == MOJO_RESULT_SHOULD_WAIT) { |
| 348 // ReadSingleMessage could end up calling HandleError which resets |
| 349 // message_pipe_ to a dummy one that is closed. The old EDK will see the |
| 350 // that the peer is closed immediately, while the new one is asynchronous |
| 351 // because of thread hops. In that case, there'll still be an async |
| 352 // waiter. |
| 353 if (!handle_watcher_.is_watching()) |
| 354 WaitToReadMore(); |
351 break; | 355 break; |
| 356 } |
352 } | 357 } |
353 } | 358 } |
354 | 359 |
355 void Connector::CancelWait() { | 360 void Connector::CancelWait() { |
356 handle_watcher_.Cancel(); | 361 handle_watcher_.Stop(); |
357 | 362 |
358 if (registered_with_sync_handle_watcher_) { | 363 if (registered_with_sync_handle_watcher_) { |
359 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); | 364 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); |
360 registered_with_sync_handle_watcher_ = false; | 365 registered_with_sync_handle_watcher_ = false; |
361 } | 366 } |
362 | 367 |
363 if (should_stop_sync_handle_watch_) | 368 if (should_stop_sync_handle_watch_) |
364 should_stop_sync_handle_watch_->data = true; | 369 should_stop_sync_handle_watch_->data = true; |
365 } | 370 } |
366 | 371 |
(...skipping 15 matching lines...) Expand all Loading... |
382 CancelWait(); | 387 CancelWait(); |
383 MayAutoLock locker(lock_.get()); | 388 MayAutoLock locker(lock_.get()); |
384 Close(std::move(message_pipe_)); | 389 Close(std::move(message_pipe_)); |
385 MessagePipe dummy_pipe; | 390 MessagePipe dummy_pipe; |
386 message_pipe_ = std::move(dummy_pipe.handle0); | 391 message_pipe_ = std::move(dummy_pipe.handle0); |
387 } else { | 392 } else { |
388 CancelWait(); | 393 CancelWait(); |
389 } | 394 } |
390 | 395 |
391 if (force_async_handler) { | 396 if (force_async_handler) { |
| 397 // |dummy_pipe.handle1| has been destructed. Reading the pipe will |
| 398 // eventually cause a read error on |message_pipe_| and set error state. |
392 if (!paused_) | 399 if (!paused_) |
393 WaitToReadMore(); | 400 WaitToReadMore(); |
394 } else { | 401 } else { |
395 error_ = true; | 402 error_ = true; |
396 connection_error_handler_.Run(); | 403 connection_error_handler_.Run(); |
397 } | 404 } |
398 } | 405 } |
399 | 406 |
400 } // namespace internal | 407 } // namespace internal |
401 } // namespace mojo | 408 } // namespace mojo |
OLD | NEW |