| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/connector.h" | 5 #include "mojo/public/cpp/bindings/lib/connector.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 #include <utility> | 8 #include <utility> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 46 | 46 |
| 47 Connector::Connector(ScopedMessagePipeHandle message_pipe, | 47 Connector::Connector(ScopedMessagePipeHandle message_pipe, |
| 48 ConnectorConfig config) | 48 ConnectorConfig config) |
| 49 : message_pipe_(std::move(message_pipe)), | 49 : message_pipe_(std::move(message_pipe)), |
| 50 incoming_receiver_(nullptr), | 50 incoming_receiver_(nullptr), |
| 51 error_(false), | 51 error_(false), |
| 52 drop_writes_(false), | 52 drop_writes_(false), |
| 53 enforce_errors_from_incoming_receiver_(true), | 53 enforce_errors_from_incoming_receiver_(true), |
| 54 paused_(false), | 54 paused_(false), |
| 55 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), | 55 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), |
| 56 register_sync_handle_watch_count_(0), | 56 allow_woken_up_by_others_(false), |
| 57 registered_with_sync_handle_watcher_(false), | |
| 58 sync_handle_watcher_callback_count_(0), | 57 sync_handle_watcher_callback_count_(0), |
| 59 weak_factory_(this) { | 58 weak_factory_(this) { |
| 60 weak_self_ = weak_factory_.GetWeakPtr(); | 59 weak_self_ = weak_factory_.GetWeakPtr(); |
| 61 // Even though we don't have an incoming receiver, we still want to monitor | 60 // Even though we don't have an incoming receiver, we still want to monitor |
| 62 // the message pipe to know if is closed or encounters an error. | 61 // the message pipe to know if is closed or encounters an error. |
| 63 WaitToReadMore(); | 62 WaitToReadMore(); |
| 64 } | 63 } |
| 65 | 64 |
| 66 Connector::~Connector() { | 65 Connector::~Connector() { |
| 67 DCHECK(thread_checker_.CalledOnValidThread()); | 66 DCHECK(thread_checker_.CalledOnValidThread()); |
| 68 | 67 |
| 69 CancelWait(); | 68 CancelWait(); |
| 70 } | 69 } |
| 71 | 70 |
| 72 void Connector::CloseMessagePipe() { | 71 void Connector::CloseMessagePipe() { |
| 73 DCHECK(thread_checker_.CalledOnValidThread()); | 72 DCHECK(thread_checker_.CalledOnValidThread()); |
| 74 | 73 |
| 75 CancelWait(); | 74 CancelWait(); |
| 76 MayAutoLock locker(lock_.get()); | 75 MayAutoLock locker(lock_.get()); |
| 77 Close(std::move(message_pipe_)); | 76 message_pipe_.reset(); |
| 78 } | 77 } |
| 79 | 78 |
| 80 ScopedMessagePipeHandle Connector::PassMessagePipe() { | 79 ScopedMessagePipeHandle Connector::PassMessagePipe() { |
| 81 DCHECK(thread_checker_.CalledOnValidThread()); | 80 DCHECK(thread_checker_.CalledOnValidThread()); |
| 82 | 81 |
| 83 CancelWait(); | 82 CancelWait(); |
| 84 MayAutoLock locker(lock_.get()); | 83 MayAutoLock locker(lock_.get()); |
| 85 return std::move(message_pipe_); | 84 return std::move(message_pipe_); |
| 86 } | 85 } |
| 87 | 86 |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 185 CHECK(false) << "Race condition or other bug detected"; | 184 CHECK(false) << "Race condition or other bug detected"; |
| 186 return false; | 185 return false; |
| 187 default: | 186 default: |
| 188 // This particular write was rejected, presumably because of bad input. | 187 // This particular write was rejected, presumably because of bad input. |
| 189 // The pipe is not necessarily in a bad state. | 188 // The pipe is not necessarily in a bad state. |
| 190 return false; | 189 return false; |
| 191 } | 190 } |
| 192 return true; | 191 return true; |
| 193 } | 192 } |
| 194 | 193 |
| 195 bool Connector::RegisterSyncHandleWatch() { | 194 void Connector::AllowWokenUpBySyncWatchOnSameThread() { |
| 196 DCHECK(thread_checker_.CalledOnValidThread()); | 195 DCHECK(thread_checker_.CalledOnValidThread()); |
| 197 | 196 |
| 198 if (error_) | 197 allow_woken_up_by_others_ = true; |
| 199 return false; | |
| 200 | 198 |
| 201 register_sync_handle_watch_count_++; | 199 EnsureSyncWatcherExists(); |
| 202 | 200 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 203 if (!registered_with_sync_handle_watcher_ && !paused_) { | |
| 204 registered_with_sync_handle_watcher_ = | |
| 205 SyncHandleWatcher::current()->RegisterHandle( | |
| 206 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | |
| 207 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | |
| 208 base::Unretained(this))); | |
| 209 } | |
| 210 return true; | |
| 211 } | 201 } |
| 212 | 202 |
| 213 void Connector::UnregisterSyncHandleWatch() { | 203 bool Connector::SyncWatch(const bool* should_stop) { |
| 214 DCHECK(thread_checker_.CalledOnValidThread()); | 204 DCHECK(thread_checker_.CalledOnValidThread()); |
| 215 | 205 |
| 216 if (register_sync_handle_watch_count_ == 0) { | |
| 217 NOTREACHED(); | |
| 218 return; | |
| 219 } | |
| 220 | |
| 221 register_sync_handle_watch_count_--; | |
| 222 if (register_sync_handle_watch_count_ > 0) | |
| 223 return; | |
| 224 | |
| 225 if (registered_with_sync_handle_watcher_) { | |
| 226 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); | |
| 227 registered_with_sync_handle_watcher_ = false; | |
| 228 } | |
| 229 } | |
| 230 | |
| 231 bool Connector::RunSyncHandleWatch(const bool* should_stop) { | |
| 232 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 233 DCHECK_GT(register_sync_handle_watch_count_, 0u); | |
| 234 | |
| 235 if (error_) | 206 if (error_) |
| 236 return false; | 207 return false; |
| 237 | 208 |
| 238 ResumeIncomingMethodCallProcessing(); | 209 ResumeIncomingMethodCallProcessing(); |
| 239 | 210 |
| 240 if (!should_stop_sync_handle_watch_) | 211 EnsureSyncWatcherExists(); |
| 241 should_stop_sync_handle_watch_ = new base::RefCountedData<bool>(false); | 212 return sync_watcher_->SyncWatch(should_stop); |
| 242 | |
| 243 // This object may be destroyed during the WatchAllHandles() call. So we have | |
| 244 // to preserve the boolean that WatchAllHandles uses. | |
| 245 scoped_refptr<base::RefCountedData<bool>> preserver = | |
| 246 should_stop_sync_handle_watch_; | |
| 247 const bool* should_stop_array[] = {should_stop, | |
| 248 &should_stop_sync_handle_watch_->data}; | |
| 249 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); | |
| 250 } | 213 } |
| 251 | 214 |
| 252 void Connector::OnWatcherHandleReady(MojoResult result) { | 215 void Connector::OnWatcherHandleReady(MojoResult result) { |
| 253 OnHandleReadyInternal(result); | 216 OnHandleReadyInternal(result); |
| 254 } | 217 } |
| 255 | 218 |
| 256 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 219 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
| 257 base::WeakPtr<Connector> weak_self(weak_self_); | 220 base::WeakPtr<Connector> weak_self(weak_self_); |
| 258 | 221 |
| 259 sync_handle_watcher_callback_count_++; | 222 sync_handle_watcher_callback_count_++; |
| (...skipping 24 matching lines...) Expand all Loading... |
| 284 base::Unretained(this))); | 247 base::Unretained(this))); |
| 285 | 248 |
| 286 if (rv != MOJO_RESULT_OK) { | 249 if (rv != MOJO_RESULT_OK) { |
| 287 // If the watch failed because the handle is invalid or its conditions can | 250 // If the watch failed because the handle is invalid or its conditions can |
| 288 // no longer be met, we signal the error asynchronously to avoid reentry. | 251 // no longer be met, we signal the error asynchronously to avoid reentry. |
| 289 base::ThreadTaskRunnerHandle::Get()->PostTask( | 252 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 290 FROM_HERE, | 253 FROM_HERE, |
| 291 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 254 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
| 292 } | 255 } |
| 293 | 256 |
| 294 if (register_sync_handle_watch_count_ > 0 && | 257 if (allow_woken_up_by_others_) { |
| 295 !registered_with_sync_handle_watcher_) { | 258 EnsureSyncWatcherExists(); |
| 296 registered_with_sync_handle_watcher_ = | 259 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 297 SyncHandleWatcher::current()->RegisterHandle( | |
| 298 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | |
| 299 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | |
| 300 base::Unretained(this))); | |
| 301 } | 260 } |
| 302 } | 261 } |
| 303 | 262 |
| 304 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 263 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
| 305 CHECK(!paused_); | 264 CHECK(!paused_); |
| 306 | 265 |
| 307 bool receiver_result = false; | 266 bool receiver_result = false; |
| 308 | 267 |
| 309 // Detect if |this| was destroyed during message dispatch. Allow for the | 268 // Detect if |this| was destroyed during message dispatch. Allow for the |
| 310 // possibility of re-entering ReadMore() through message dispatch. | 269 // possibility of re-entering ReadMore() through message dispatch. |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 348 if (paused_) | 307 if (paused_) |
| 349 return; | 308 return; |
| 350 | 309 |
| 351 if (rv == MOJO_RESULT_SHOULD_WAIT) | 310 if (rv == MOJO_RESULT_SHOULD_WAIT) |
| 352 break; | 311 break; |
| 353 } | 312 } |
| 354 } | 313 } |
| 355 | 314 |
| 356 void Connector::CancelWait() { | 315 void Connector::CancelWait() { |
| 357 handle_watcher_.Cancel(); | 316 handle_watcher_.Cancel(); |
| 358 | 317 sync_watcher_.reset(); |
| 359 if (registered_with_sync_handle_watcher_) { | |
| 360 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); | |
| 361 registered_with_sync_handle_watcher_ = false; | |
| 362 } | |
| 363 | |
| 364 if (should_stop_sync_handle_watch_) | |
| 365 should_stop_sync_handle_watch_->data = true; | |
| 366 } | 318 } |
| 367 | 319 |
| 368 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { | 320 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
| 369 if (error_ || !message_pipe_.is_valid()) | 321 if (error_ || !message_pipe_.is_valid()) |
| 370 return; | 322 return; |
| 371 | 323 |
| 372 if (during_sync_handle_watcher_callback() || paused_) { | 324 if (paused_) { |
| 373 // Enforce calling the error handler asynchronously if: | 325 // Enforce calling the error handler asynchronously if the user has paused |
| 374 // - currently we are in a sync handle watcher callback. We don't want the | 326 // receiving messages. We need to wait until the user starts receiving |
| 375 // error handler to reenter an ongoing sync call. | 327 // messages again. |
| 376 // - the user has paused receiving messages. We need to wait until the user | |
| 377 // starts receiving messages again. | |
| 378 force_async_handler = true; | 328 force_async_handler = true; |
| 379 } | 329 } |
| 380 | 330 |
| 381 if (!force_pipe_reset && force_async_handler) | 331 if (!force_pipe_reset && force_async_handler) |
| 382 force_pipe_reset = true; | 332 force_pipe_reset = true; |
| 383 | 333 |
| 384 if (force_pipe_reset) { | 334 if (force_pipe_reset) { |
| 385 CancelWait(); | 335 CancelWait(); |
| 386 MayAutoLock locker(lock_.get()); | 336 MayAutoLock locker(lock_.get()); |
| 387 Close(std::move(message_pipe_)); | 337 message_pipe_.reset(); |
| 388 MessagePipe dummy_pipe; | 338 MessagePipe dummy_pipe; |
| 389 message_pipe_ = std::move(dummy_pipe.handle0); | 339 message_pipe_ = std::move(dummy_pipe.handle0); |
| 390 } else { | 340 } else { |
| 391 CancelWait(); | 341 CancelWait(); |
| 392 } | 342 } |
| 393 | 343 |
| 394 if (force_async_handler) { | 344 if (force_async_handler) { |
| 395 if (!paused_) | 345 if (!paused_) |
| 396 WaitToReadMore(); | 346 WaitToReadMore(); |
| 397 } else { | 347 } else { |
| 398 error_ = true; | 348 error_ = true; |
| 399 connection_error_handler_.Run(); | 349 connection_error_handler_.Run(); |
| 400 } | 350 } |
| 401 } | 351 } |
| 402 | 352 |
| 353 void Connector::EnsureSyncWatcherExists() { |
| 354 if (sync_watcher_) |
| 355 return; |
| 356 sync_watcher_.reset(new SyncHandleWatcher( |
| 357 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 358 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
| 359 base::Unretained(this)))); |
| 360 } |
| 361 |
| 403 } // namespace internal | 362 } // namespace internal |
| 404 } // namespace mojo | 363 } // namespace mojo |
| OLD | NEW |