| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/connector.h" | 5 #include "mojo/public/cpp/bindings/lib/connector.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 #include <utility> | 8 #include <utility> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| 11 #include "base/logging.h" | 11 #include "base/logging.h" |
| 12 #include "base/macros.h" | 12 #include "base/macros.h" |
| 13 #include "base/synchronization/lock.h" | 13 #include "base/synchronization/lock.h" |
| 14 #include "base/thread_task_runner_handle.h" | |
| 15 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" | 14 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" |
| 16 | 15 |
| 17 namespace mojo { | 16 namespace mojo { |
| 18 namespace internal { | 17 namespace internal { |
| 19 | 18 |
| 20 namespace { | 19 namespace { |
| 21 | 20 |
| 22 // Similar to base::AutoLock, except that it does nothing if |lock| passed into | 21 // Similar to base::AutoLock, except that it does nothing if |lock| passed into |
| 23 // the constructor is null. | 22 // the constructor is null. |
| 24 class MayAutoLock { | 23 class MayAutoLock { |
| (...skipping 216 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 241 | 240 |
| 242 // This object may be destroyed during the WatchAllHandles() call. So we have | 241 // This object may be destroyed during the WatchAllHandles() call. So we have |
| 243 // to preserve the boolean that WatchAllHandles uses. | 242 // to preserve the boolean that WatchAllHandles uses. |
| 244 scoped_refptr<base::RefCountedData<bool>> preserver = | 243 scoped_refptr<base::RefCountedData<bool>> preserver = |
| 245 should_stop_sync_handle_watch_; | 244 should_stop_sync_handle_watch_; |
| 246 const bool* should_stop_array[] = {should_stop, | 245 const bool* should_stop_array[] = {should_stop, |
| 247 &should_stop_sync_handle_watch_->data}; | 246 &should_stop_sync_handle_watch_->data}; |
| 248 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); | 247 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); |
| 249 } | 248 } |
| 250 | 249 |
| 251 void Connector::OnWatcherHandleReady(MojoResult result) { | 250 void Connector::OnHandleWatcherHandleReady(MojoResult result) { |
| 252 OnHandleReadyInternal(result); | 251 OnHandleReadyInternal(result); |
| 253 } | 252 } |
| 254 | 253 |
| 255 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 254 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
| 256 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); | 255 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); |
| 257 | 256 |
| 258 sync_handle_watcher_callback_count_++; | 257 sync_handle_watcher_callback_count_++; |
| 259 OnHandleReadyInternal(result); | 258 OnHandleReadyInternal(result); |
| 260 // At this point, this object might have been deleted. | 259 // At this point, this object might have been deleted. |
| 261 if (weak_self) | 260 if (weak_self) |
| 262 sync_handle_watcher_callback_count_--; | 261 sync_handle_watcher_callback_count_--; |
| 263 } | 262 } |
| 264 | 263 |
| 265 void Connector::OnHandleReadyInternal(MojoResult result) { | 264 void Connector::OnHandleReadyInternal(MojoResult result) { |
| 266 DCHECK(thread_checker_.CalledOnValidThread()); | 265 DCHECK(thread_checker_.CalledOnValidThread()); |
| 267 | 266 |
| 268 if (result != MOJO_RESULT_OK) { | 267 if (result != MOJO_RESULT_OK) { |
| 269 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 268 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
| 270 return; | 269 return; |
| 271 } | 270 } |
| 272 ReadAllAvailableMessages(); | 271 ReadAllAvailableMessages(); |
| 273 // At this point, this object might have been deleted. Return. | 272 // At this point, this object might have been deleted. Return. |
| 274 } | 273 } |
| 275 | 274 |
| 276 void Connector::WaitToReadMore() { | 275 void Connector::WaitToReadMore() { |
| 276 CHECK(!handle_watcher_.is_watching()); |
| 277 CHECK(!paused_); | 277 CHECK(!paused_); |
| 278 DCHECK(!handle_watcher_.IsWatching()); | 278 handle_watcher_.Start(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 279 | 279 MOJO_DEADLINE_INDEFINITE, |
| 280 MojoResult rv = handle_watcher_.Start( | 280 base::Bind(&Connector::OnHandleWatcherHandleReady, |
| 281 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 281 base::Unretained(this))); |
| 282 base::Bind(&Connector::OnWatcherHandleReady, | |
| 283 base::Unretained(this))); | |
| 284 | |
| 285 if (rv != MOJO_RESULT_OK) { | |
| 286 // If the watch failed because the handle is invalid or its conditions can | |
| 287 // no longer be met, we signal the error asynchronously to avoid reentry. | |
| 288 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 289 FROM_HERE, base::Bind(&Connector::OnWatcherHandleReady, | |
| 290 weak_factory_.GetWeakPtr(), rv)); | |
| 291 } | |
| 292 | 282 |
| 293 if (register_sync_handle_watch_count_ > 0 && | 283 if (register_sync_handle_watch_count_ > 0 && |
| 294 !registered_with_sync_handle_watcher_) { | 284 !registered_with_sync_handle_watcher_) { |
| 295 registered_with_sync_handle_watcher_ = | 285 registered_with_sync_handle_watcher_ = |
| 296 SyncHandleWatcher::current()->RegisterHandle( | 286 SyncHandleWatcher::current()->RegisterHandle( |
| 297 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 287 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 298 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | 288 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
| 299 base::Unretained(this))); | 289 base::Unretained(this))); |
| 300 } | 290 } |
| 301 } | 291 } |
| 302 | 292 |
| 303 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 293 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
| 304 CHECK(!paused_); | 294 CHECK(!paused_); |
| 305 | 295 |
| 306 bool receiver_result = false; | 296 bool receiver_result = false; |
| 307 | 297 |
| 308 // Detect if |this| was destroyed during message dispatch. Allow for the | 298 // Detect if |this| was destroyed during message dispatch. Allow for the |
| 309 // possibility of re-entering ReadMore() through message dispatch. | 299 // possibility of re-entering ReadMore() through message dispatch. |
| 310 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); | 300 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); |
| 311 | 301 |
| 312 Message message; | 302 Message message; |
| 313 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 303 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
| 314 *read_result = rv; | 304 *read_result = rv; |
| 315 | 305 |
| 316 if (rv == MOJO_RESULT_OK) { | 306 if (rv == MOJO_RESULT_OK) { |
| 307 // Dispatching the message may spin in a nested message loop. To ensure we |
| 308 // continue dispatching messages when this happens start listening for |
| 309 // messagse now. |
| 310 if (!handle_watcher_.is_watching()) { |
| 311 // TODO: Need to evaluate the perf impact of this. |
| 312 WaitToReadMore(); |
| 313 } |
| 317 receiver_result = | 314 receiver_result = |
| 318 incoming_receiver_ && incoming_receiver_->Accept(&message); | 315 incoming_receiver_ && incoming_receiver_->Accept(&message); |
| 319 } | 316 } |
| 320 | 317 |
| 321 if (!weak_self) | 318 if (!weak_self) |
| 322 return false; | 319 return false; |
| 323 | 320 |
| 324 if (rv == MOJO_RESULT_SHOULD_WAIT) | 321 if (rv == MOJO_RESULT_SHOULD_WAIT) |
| 325 return true; | 322 return true; |
| 326 | 323 |
| (...skipping 13 matching lines...) Expand all Loading... |
| 340 while (!error_) { | 337 while (!error_) { |
| 341 MojoResult rv; | 338 MojoResult rv; |
| 342 | 339 |
| 343 // Return immediately if |this| was destroyed. Do not touch any members! | 340 // Return immediately if |this| was destroyed. Do not touch any members! |
| 344 if (!ReadSingleMessage(&rv)) | 341 if (!ReadSingleMessage(&rv)) |
| 345 return; | 342 return; |
| 346 | 343 |
| 347 if (paused_) | 344 if (paused_) |
| 348 return; | 345 return; |
| 349 | 346 |
| 350 if (rv == MOJO_RESULT_SHOULD_WAIT) | 347 if (rv == MOJO_RESULT_SHOULD_WAIT) { |
| 348 // ReadSingleMessage could end up calling HandleError which resets |
| 349 // message_pipe_ to a dummy one that is closed. The old EDK will see the |
| 350 // that the peer is closed immediately, while the new one is asynchronous |
| 351 // because of thread hops. In that case, there'll still be an async |
| 352 // waiter. |
| 353 if (!handle_watcher_.is_watching()) |
| 354 WaitToReadMore(); |
| 351 break; | 355 break; |
| 356 } |
| 352 } | 357 } |
| 353 } | 358 } |
| 354 | 359 |
| 355 void Connector::CancelWait() { | 360 void Connector::CancelWait() { |
| 356 handle_watcher_.Cancel(); | 361 handle_watcher_.Stop(); |
| 357 | 362 |
| 358 if (registered_with_sync_handle_watcher_) { | 363 if (registered_with_sync_handle_watcher_) { |
| 359 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); | 364 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); |
| 360 registered_with_sync_handle_watcher_ = false; | 365 registered_with_sync_handle_watcher_ = false; |
| 361 } | 366 } |
| 362 | 367 |
| 363 if (should_stop_sync_handle_watch_) | 368 if (should_stop_sync_handle_watch_) |
| 364 should_stop_sync_handle_watch_->data = true; | 369 should_stop_sync_handle_watch_->data = true; |
| 365 } | 370 } |
| 366 | 371 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 382 CancelWait(); | 387 CancelWait(); |
| 383 MayAutoLock locker(lock_.get()); | 388 MayAutoLock locker(lock_.get()); |
| 384 Close(std::move(message_pipe_)); | 389 Close(std::move(message_pipe_)); |
| 385 MessagePipe dummy_pipe; | 390 MessagePipe dummy_pipe; |
| 386 message_pipe_ = std::move(dummy_pipe.handle0); | 391 message_pipe_ = std::move(dummy_pipe.handle0); |
| 387 } else { | 392 } else { |
| 388 CancelWait(); | 393 CancelWait(); |
| 389 } | 394 } |
| 390 | 395 |
| 391 if (force_async_handler) { | 396 if (force_async_handler) { |
| 397 // |dummy_pipe.handle1| has been destructed. Reading the pipe will |
| 398 // eventually cause a read error on |message_pipe_| and set error state. |
| 392 if (!paused_) | 399 if (!paused_) |
| 393 WaitToReadMore(); | 400 WaitToReadMore(); |
| 394 } else { | 401 } else { |
| 395 error_ = true; | 402 error_ = true; |
| 396 connection_error_handler_.Run(); | 403 connection_error_handler_.Run(); |
| 397 } | 404 } |
| 398 } | 405 } |
| 399 | 406 |
| 400 } // namespace internal | 407 } // namespace internal |
| 401 } // namespace mojo | 408 } // namespace mojo |
| OLD | NEW |