| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 171 } | 171 } |
| 172 | 172 |
| 173 void OnHandleReady(MojoResult result) { | 173 void OnHandleReady(MojoResult result) { |
| 174 DCHECK(task_runner_->BelongsToCurrentThread()); | 174 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 175 scoped_refptr<MultiplexRouter> router_protector(router_); | 175 scoped_refptr<MultiplexRouter> router_protector(router_); |
| 176 | 176 |
| 177 // Because we never close |sync_message_event_{sender,receiver}_| before | 177 // Because we never close |sync_message_event_{sender,receiver}_| before |
| 178 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. | 178 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. |
| 179 DCHECK_EQ(MOJO_RESULT_OK, result); | 179 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 180 | 180 |
| 181 MayAutoLock locker(router_->lock_.get()); | 181 MayAutoLock locker(&router_->lock_); |
| 182 scoped_refptr<InterfaceEndpoint> self_protector(this); | 182 scoped_refptr<InterfaceEndpoint> self_protector(this); |
| 183 | 183 |
| 184 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); | 184 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
| 185 | 185 |
| 186 if (!more_to_process) | 186 if (!more_to_process) |
| 187 ResetSyncMessageSignal(); | 187 ResetSyncMessageSignal(); |
| 188 | 188 |
| 189 // Currently there are no queued sync messages and the peer has closed so | 189 // Currently there are no queued sync messages and the peer has closed so |
| 190 // there won't be incoming sync messages in the future. | 190 // there won't be incoming sync messages in the future. |
| 191 if (!more_to_process && peer_closed_) { | 191 if (!more_to_process && peer_closed_) { |
| 192 // If a SyncWatch() call (or multiple ones) of this interface endpoint is | 192 // If a SyncWatch() call (or multiple ones) of this interface endpoint is |
| 193 // on the call stack, resetting the sync watcher will allow it to exit | 193 // on the call stack, resetting the sync watcher will allow it to exit |
| 194 // when the call stack unwinds to that frame. | 194 // when the call stack unwinds to that frame. |
| 195 sync_watcher_.reset(); | 195 sync_watcher_.reset(); |
| 196 } | 196 } |
| 197 } | 197 } |
| 198 | 198 |
| 199 void EnsureSyncWatcherExists() { | 199 void EnsureSyncWatcherExists() { |
| 200 DCHECK(task_runner_->BelongsToCurrentThread()); | 200 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 201 if (sync_watcher_) | 201 if (sync_watcher_) |
| 202 return; | 202 return; |
| 203 | 203 |
| 204 { | 204 { |
| 205 MayAutoLock locker(router_->lock_.get()); | 205 MayAutoLock locker(&router_->lock_); |
| 206 EnsureEventMessagePipeExists(); | 206 EnsureEventMessagePipeExists(); |
| 207 | 207 |
| 208 auto iter = router_->sync_message_tasks_.find(id_); | 208 auto iter = router_->sync_message_tasks_.find(id_); |
| 209 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) | 209 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
| 210 SignalSyncMessageEvent(); | 210 SignalSyncMessageEvent(); |
| 211 } | 211 } |
| 212 | 212 |
| 213 sync_watcher_.reset(new SyncHandleWatcher( | 213 sync_watcher_.reset(new SyncHandleWatcher( |
| 214 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 214 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 215 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); | 215 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 282 | 282 |
| 283 MessageWrapper(MessageWrapper&& other) | 283 MessageWrapper(MessageWrapper&& other) |
| 284 : router_(other.router_), value_(std::move(other.value_)) {} | 284 : router_(other.router_), value_(std::move(other.value_)) {} |
| 285 | 285 |
| 286 ~MessageWrapper() { | 286 ~MessageWrapper() { |
| 287 if (value_.associated_endpoint_handles()->empty()) | 287 if (value_.associated_endpoint_handles()->empty()) |
| 288 return; | 288 return; |
| 289 | 289 |
| 290 router_->AssertLockAcquired(); | 290 router_->AssertLockAcquired(); |
| 291 { | 291 { |
| 292 MayAutoUnlock unlocker(router_->lock_.get()); | 292 MayAutoUnlock unlocker(&router_->lock_); |
| 293 value_.mutable_associated_endpoint_handles()->clear(); | 293 value_.mutable_associated_endpoint_handles()->clear(); |
| 294 } | 294 } |
| 295 } | 295 } |
| 296 | 296 |
| 297 MessageWrapper& operator=(MessageWrapper&& other) { | 297 MessageWrapper& operator=(MessageWrapper&& other) { |
| 298 router_ = other.router_; | 298 router_ = other.router_; |
| 299 value_ = std::move(other.value_); | 299 value_ = std::move(other.value_); |
| 300 return *this; | 300 return *this; |
| 301 } | 301 } |
| 302 | 302 |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 348 bool set_interface_id_namesapce_bit, | 348 bool set_interface_id_namesapce_bit, |
| 349 scoped_refptr<base::SingleThreadTaskRunner> runner) | 349 scoped_refptr<base::SingleThreadTaskRunner> runner) |
| 350 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 350 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 351 task_runner_(runner), | 351 task_runner_(runner), |
| 352 header_validator_(nullptr), | 352 header_validator_(nullptr), |
| 353 filters_(this), | 353 filters_(this), |
| 354 connector_(std::move(message_pipe), | 354 connector_(std::move(message_pipe), |
| 355 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND | 355 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND |
| 356 : Connector::SINGLE_THREADED_SEND, | 356 : Connector::SINGLE_THREADED_SEND, |
| 357 std::move(runner)), | 357 std::move(runner)), |
| 358 lock_(config == MULTI_INTERFACE ? new base::Lock : nullptr), | |
| 359 control_message_handler_(this), | 358 control_message_handler_(this), |
| 360 control_message_proxy_(&connector_), | 359 control_message_proxy_(&connector_), |
| 361 next_interface_id_value_(1), | 360 next_interface_id_value_(1), |
| 362 posted_to_process_tasks_(false), | 361 posted_to_process_tasks_(false), |
| 363 encountered_error_(false), | 362 encountered_error_(false), |
| 364 paused_(false), | 363 paused_(false), |
| 365 testing_mode_(false) { | 364 testing_mode_(false) { |
| 366 DCHECK(task_runner_->BelongsToCurrentThread()); | 365 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 367 | 366 |
| 367 if (config == MULTI_INTERFACE) |
| 368 lock_.emplace(); |
| 369 |
| 368 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || | 370 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || |
| 369 config == MULTI_INTERFACE) { | 371 config == MULTI_INTERFACE) { |
| 370 // Always participate in sync handle watching in multi-interface mode, | 372 // Always participate in sync handle watching in multi-interface mode, |
| 371 // because even if it doesn't expect sync requests during sync handle | 373 // because even if it doesn't expect sync requests during sync handle |
| 372 // watching, it may still need to dispatch messages to associated endpoints | 374 // watching, it may still need to dispatch messages to associated endpoints |
| 373 // on a different thread. | 375 // on a different thread. |
| 374 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 376 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
| 375 } | 377 } |
| 376 connector_.set_incoming_receiver(&filters_); | 378 connector_.set_incoming_receiver(&filters_); |
| 377 connector_.set_connection_error_handler( | 379 connector_.set_connection_error_handler( |
| 378 base::Bind(&MultiplexRouter::OnPipeConnectionError, | 380 base::Bind(&MultiplexRouter::OnPipeConnectionError, |
| 379 base::Unretained(this))); | 381 base::Unretained(this))); |
| 380 | 382 |
| 381 std::unique_ptr<MessageHeaderValidator> header_validator = | 383 std::unique_ptr<MessageHeaderValidator> header_validator = |
| 382 base::MakeUnique<MessageHeaderValidator>(); | 384 base::MakeUnique<MessageHeaderValidator>(); |
| 383 header_validator_ = header_validator.get(); | 385 header_validator_ = header_validator.get(); |
| 384 filters_.Append(std::move(header_validator)); | 386 filters_.Append(std::move(header_validator)); |
| 385 } | 387 } |
| 386 | 388 |
| 387 MultiplexRouter::~MultiplexRouter() { | 389 MultiplexRouter::~MultiplexRouter() { |
| 388 MayAutoLock locker(lock_.get()); | 390 MayAutoLock locker(&lock_); |
| 389 | 391 |
| 390 sync_message_tasks_.clear(); | 392 sync_message_tasks_.clear(); |
| 391 tasks_.clear(); | 393 tasks_.clear(); |
| 392 | 394 |
| 393 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 395 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 394 InterfaceEndpoint* endpoint = iter->second.get(); | 396 InterfaceEndpoint* endpoint = iter->second.get(); |
| 395 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 397 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 396 // because it may remove the corresponding value from the map. | 398 // because it may remove the corresponding value from the map. |
| 397 ++iter; | 399 ++iter; |
| 398 | 400 |
| (...skipping 20 matching lines...) Expand all Loading... |
| 419 connector_.SetWatcherHeapProfilerTag(name); | 421 connector_.SetWatcherHeapProfilerTag(name); |
| 420 } | 422 } |
| 421 | 423 |
| 422 InterfaceId MultiplexRouter::AssociateInterface( | 424 InterfaceId MultiplexRouter::AssociateInterface( |
| 423 ScopedInterfaceEndpointHandle handle_to_send) { | 425 ScopedInterfaceEndpointHandle handle_to_send) { |
| 424 if (!handle_to_send.pending_association()) | 426 if (!handle_to_send.pending_association()) |
| 425 return kInvalidInterfaceId; | 427 return kInvalidInterfaceId; |
| 426 | 428 |
| 427 uint32_t id = 0; | 429 uint32_t id = 0; |
| 428 { | 430 { |
| 429 MayAutoLock locker(lock_.get()); | 431 MayAutoLock locker(&lock_); |
| 430 do { | 432 do { |
| 431 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) | 433 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) |
| 432 next_interface_id_value_ = 1; | 434 next_interface_id_value_ = 1; |
| 433 id = next_interface_id_value_++; | 435 id = next_interface_id_value_++; |
| 434 if (set_interface_id_namespace_bit_) | 436 if (set_interface_id_namespace_bit_) |
| 435 id |= kInterfaceIdNamespaceMask; | 437 id |= kInterfaceIdNamespaceMask; |
| 436 } while (base::ContainsKey(endpoints_, id)); | 438 } while (base::ContainsKey(endpoints_, id)); |
| 437 | 439 |
| 438 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); | 440 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| 439 endpoints_[id] = endpoint; | 441 endpoints_[id] = endpoint; |
| 440 if (encountered_error_) | 442 if (encountered_error_) |
| 441 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 443 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 442 endpoint->set_handle_created(); | 444 endpoint->set_handle_created(); |
| 443 } | 445 } |
| 444 | 446 |
| 445 if (!NotifyAssociation(&handle_to_send, id)) { | 447 if (!NotifyAssociation(&handle_to_send, id)) { |
| 446 // The peer handle of |handle_to_send|, which is supposed to join this | 448 // The peer handle of |handle_to_send|, which is supposed to join this |
| 447 // associated group, has been closed. | 449 // associated group, has been closed. |
| 448 { | 450 { |
| 449 MayAutoLock locker(lock_.get()); | 451 MayAutoLock locker(&lock_); |
| 450 InterfaceEndpoint* endpoint = FindEndpoint(id); | 452 InterfaceEndpoint* endpoint = FindEndpoint(id); |
| 451 if (endpoint) | 453 if (endpoint) |
| 452 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 454 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 453 } | 455 } |
| 454 | 456 |
| 455 control_message_proxy_.NotifyPeerEndpointClosed( | 457 control_message_proxy_.NotifyPeerEndpointClosed( |
| 456 id, handle_to_send.disconnect_reason()); | 458 id, handle_to_send.disconnect_reason()); |
| 457 } | 459 } |
| 458 return id; | 460 return id; |
| 459 } | 461 } |
| 460 | 462 |
| 461 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( | 463 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
| 462 InterfaceId id) { | 464 InterfaceId id) { |
| 463 if (!IsValidInterfaceId(id)) | 465 if (!IsValidInterfaceId(id)) |
| 464 return ScopedInterfaceEndpointHandle(); | 466 return ScopedInterfaceEndpointHandle(); |
| 465 | 467 |
| 466 MayAutoLock locker(lock_.get()); | 468 MayAutoLock locker(&lock_); |
| 467 bool inserted = false; | 469 bool inserted = false; |
| 468 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 470 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 469 if (inserted) { | 471 if (inserted) { |
| 470 DCHECK(!endpoint->handle_created()); | 472 DCHECK(!endpoint->handle_created()); |
| 471 | 473 |
| 472 if (encountered_error_) | 474 if (encountered_error_) |
| 473 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 475 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 474 } else { | 476 } else { |
| 475 // If the endpoint already exist, it is because we have received a | 477 // If the endpoint already exist, it is because we have received a |
| 476 // notification that the peer endpoint has closed. | 478 // notification that the peer endpoint has closed. |
| 477 CHECK(!endpoint->closed()); | 479 CHECK(!endpoint->closed()); |
| 478 CHECK(endpoint->peer_closed()); | 480 CHECK(endpoint->peer_closed()); |
| 479 | 481 |
| 480 if (endpoint->handle_created()) | 482 if (endpoint->handle_created()) |
| 481 return ScopedInterfaceEndpointHandle(); | 483 return ScopedInterfaceEndpointHandle(); |
| 482 } | 484 } |
| 483 | 485 |
| 484 endpoint->set_handle_created(); | 486 endpoint->set_handle_created(); |
| 485 return CreateScopedInterfaceEndpointHandle(id); | 487 return CreateScopedInterfaceEndpointHandle(id); |
| 486 } | 488 } |
| 487 | 489 |
| 488 void MultiplexRouter::CloseEndpointHandle( | 490 void MultiplexRouter::CloseEndpointHandle( |
| 489 InterfaceId id, | 491 InterfaceId id, |
| 490 const base::Optional<DisconnectReason>& reason) { | 492 const base::Optional<DisconnectReason>& reason) { |
| 491 if (!IsValidInterfaceId(id)) | 493 if (!IsValidInterfaceId(id)) |
| 492 return; | 494 return; |
| 493 | 495 |
| 494 MayAutoLock locker(lock_.get()); | 496 MayAutoLock locker(&lock_); |
| 495 DCHECK(base::ContainsKey(endpoints_, id)); | 497 DCHECK(base::ContainsKey(endpoints_, id)); |
| 496 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 498 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 497 DCHECK(!endpoint->client()); | 499 DCHECK(!endpoint->client()); |
| 498 DCHECK(!endpoint->closed()); | 500 DCHECK(!endpoint->closed()); |
| 499 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 501 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 500 | 502 |
| 501 if (!IsMasterInterfaceId(id) || reason) { | 503 if (!IsMasterInterfaceId(id) || reason) { |
| 502 MayAutoUnlock unlocker(lock_.get()); | 504 MayAutoUnlock unlocker(&lock_); |
| 503 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); | 505 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); |
| 504 } | 506 } |
| 505 | 507 |
| 506 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 508 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 507 } | 509 } |
| 508 | 510 |
| 509 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( | 511 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
| 510 const ScopedInterfaceEndpointHandle& handle, | 512 const ScopedInterfaceEndpointHandle& handle, |
| 511 InterfaceEndpointClient* client, | 513 InterfaceEndpointClient* client, |
| 512 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 514 scoped_refptr<base::SingleThreadTaskRunner> runner) { |
| 513 const InterfaceId id = handle.id(); | 515 const InterfaceId id = handle.id(); |
| 514 | 516 |
| 515 DCHECK(IsValidInterfaceId(id)); | 517 DCHECK(IsValidInterfaceId(id)); |
| 516 DCHECK(client); | 518 DCHECK(client); |
| 517 | 519 |
| 518 MayAutoLock locker(lock_.get()); | 520 MayAutoLock locker(&lock_); |
| 519 DCHECK(base::ContainsKey(endpoints_, id)); | 521 DCHECK(base::ContainsKey(endpoints_, id)); |
| 520 | 522 |
| 521 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 523 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 522 endpoint->AttachClient(client, std::move(runner)); | 524 endpoint->AttachClient(client, std::move(runner)); |
| 523 | 525 |
| 524 if (endpoint->peer_closed()) | 526 if (endpoint->peer_closed()) |
| 525 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 527 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 526 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 528 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 527 | 529 |
| 528 return endpoint; | 530 return endpoint; |
| 529 } | 531 } |
| 530 | 532 |
| 531 void MultiplexRouter::DetachEndpointClient( | 533 void MultiplexRouter::DetachEndpointClient( |
| 532 const ScopedInterfaceEndpointHandle& handle) { | 534 const ScopedInterfaceEndpointHandle& handle) { |
| 533 const InterfaceId id = handle.id(); | 535 const InterfaceId id = handle.id(); |
| 534 | 536 |
| 535 DCHECK(IsValidInterfaceId(id)); | 537 DCHECK(IsValidInterfaceId(id)); |
| 536 | 538 |
| 537 MayAutoLock locker(lock_.get()); | 539 MayAutoLock locker(&lock_); |
| 538 DCHECK(base::ContainsKey(endpoints_, id)); | 540 DCHECK(base::ContainsKey(endpoints_, id)); |
| 539 | 541 |
| 540 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 542 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 541 endpoint->DetachClient(); | 543 endpoint->DetachClient(); |
| 542 } | 544 } |
| 543 | 545 |
| 544 void MultiplexRouter::RaiseError() { | 546 void MultiplexRouter::RaiseError() { |
| 545 if (task_runner_->BelongsToCurrentThread()) { | 547 if (task_runner_->BelongsToCurrentThread()) { |
| 546 connector_.RaiseError(); | 548 connector_.RaiseError(); |
| 547 } else { | 549 } else { |
| 548 task_runner_->PostTask(FROM_HERE, | 550 task_runner_->PostTask(FROM_HERE, |
| 549 base::Bind(&MultiplexRouter::RaiseError, this)); | 551 base::Bind(&MultiplexRouter::RaiseError, this)); |
| 550 } | 552 } |
| 551 } | 553 } |
| 552 | 554 |
| 553 void MultiplexRouter::CloseMessagePipe() { | 555 void MultiplexRouter::CloseMessagePipe() { |
| 554 DCHECK(thread_checker_.CalledOnValidThread()); | 556 DCHECK(thread_checker_.CalledOnValidThread()); |
| 555 connector_.CloseMessagePipe(); | 557 connector_.CloseMessagePipe(); |
| 556 // CloseMessagePipe() above won't trigger connection error handler. | 558 // CloseMessagePipe() above won't trigger connection error handler. |
| 557 // Explicitly call OnPipeConnectionError() so that associated endpoints will | 559 // Explicitly call OnPipeConnectionError() so that associated endpoints will |
| 558 // get notified. | 560 // get notified. |
| 559 OnPipeConnectionError(); | 561 OnPipeConnectionError(); |
| 560 } | 562 } |
| 561 | 563 |
| 562 void MultiplexRouter::PauseIncomingMethodCallProcessing() { | 564 void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
| 563 DCHECK(thread_checker_.CalledOnValidThread()); | 565 DCHECK(thread_checker_.CalledOnValidThread()); |
| 564 connector_.PauseIncomingMethodCallProcessing(); | 566 connector_.PauseIncomingMethodCallProcessing(); |
| 565 | 567 |
| 566 MayAutoLock locker(lock_.get()); | 568 MayAutoLock locker(&lock_); |
| 567 paused_ = true; | 569 paused_ = true; |
| 568 | 570 |
| 569 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) | 571 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) |
| 570 iter->second->ResetSyncMessageSignal(); | 572 iter->second->ResetSyncMessageSignal(); |
| 571 } | 573 } |
| 572 | 574 |
| 573 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { | 575 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
| 574 DCHECK(thread_checker_.CalledOnValidThread()); | 576 DCHECK(thread_checker_.CalledOnValidThread()); |
| 575 connector_.ResumeIncomingMethodCallProcessing(); | 577 connector_.ResumeIncomingMethodCallProcessing(); |
| 576 | 578 |
| 577 MayAutoLock locker(lock_.get()); | 579 MayAutoLock locker(&lock_); |
| 578 paused_ = false; | 580 paused_ = false; |
| 579 | 581 |
| 580 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { | 582 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { |
| 581 auto sync_iter = sync_message_tasks_.find(iter->first); | 583 auto sync_iter = sync_message_tasks_.find(iter->first); |
| 582 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) | 584 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) |
| 583 iter->second->SignalSyncMessageEvent(); | 585 iter->second->SignalSyncMessageEvent(); |
| 584 } | 586 } |
| 585 | 587 |
| 586 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 588 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 587 } | 589 } |
| 588 | 590 |
| 589 bool MultiplexRouter::HasAssociatedEndpoints() const { | 591 bool MultiplexRouter::HasAssociatedEndpoints() const { |
| 590 DCHECK(thread_checker_.CalledOnValidThread()); | 592 DCHECK(thread_checker_.CalledOnValidThread()); |
| 591 MayAutoLock locker(lock_.get()); | 593 MayAutoLock locker(&lock_); |
| 592 | 594 |
| 593 if (endpoints_.size() > 1) | 595 if (endpoints_.size() > 1) |
| 594 return true; | 596 return true; |
| 595 if (endpoints_.size() == 0) | 597 if (endpoints_.size() == 0) |
| 596 return false; | 598 return false; |
| 597 | 599 |
| 598 return !base::ContainsKey(endpoints_, kMasterInterfaceId); | 600 return !base::ContainsKey(endpoints_, kMasterInterfaceId); |
| 599 } | 601 } |
| 600 | 602 |
| 601 void MultiplexRouter::EnableTestingMode() { | 603 void MultiplexRouter::EnableTestingMode() { |
| 602 DCHECK(thread_checker_.CalledOnValidThread()); | 604 DCHECK(thread_checker_.CalledOnValidThread()); |
| 603 MayAutoLock locker(lock_.get()); | 605 MayAutoLock locker(&lock_); |
| 604 | 606 |
| 605 testing_mode_ = true; | 607 testing_mode_ = true; |
| 606 connector_.set_enforce_errors_from_incoming_receiver(false); | 608 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 607 } | 609 } |
| 608 | 610 |
| 609 bool MultiplexRouter::Accept(Message* message) { | 611 bool MultiplexRouter::Accept(Message* message) { |
| 610 DCHECK(thread_checker_.CalledOnValidThread()); | 612 DCHECK(thread_checker_.CalledOnValidThread()); |
| 611 | 613 |
| 612 if (!message->DeserializeAssociatedEndpointHandles(this)) | 614 if (!message->DeserializeAssociatedEndpointHandles(this)) |
| 613 return false; | 615 return false; |
| 614 | 616 |
| 615 scoped_refptr<MultiplexRouter> protector(this); | 617 scoped_refptr<MultiplexRouter> protector(this); |
| 616 MayAutoLock locker(lock_.get()); | 618 MayAutoLock locker(&lock_); |
| 617 | 619 |
| 618 DCHECK(!paused_); | 620 DCHECK(!paused_); |
| 619 | 621 |
| 620 ClientCallBehavior client_call_behavior = | 622 ClientCallBehavior client_call_behavior = |
| 621 connector_.during_sync_handle_watcher_callback() | 623 connector_.during_sync_handle_watcher_callback() |
| 622 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 624 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 623 : ALLOW_DIRECT_CLIENT_CALLS; | 625 : ALLOW_DIRECT_CLIENT_CALLS; |
| 624 | 626 |
| 625 bool processed = | 627 bool processed = |
| 626 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, | 628 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, |
| (...skipping 23 matching lines...) Expand all Loading... |
| 650 // Always return true. If we see errors during message processing, we will | 652 // Always return true. If we see errors during message processing, we will |
| 651 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 653 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| 652 return true; | 654 return true; |
| 653 } | 655 } |
| 654 | 656 |
| 655 bool MultiplexRouter::OnPeerAssociatedEndpointClosed( | 657 bool MultiplexRouter::OnPeerAssociatedEndpointClosed( |
| 656 InterfaceId id, | 658 InterfaceId id, |
| 657 const base::Optional<DisconnectReason>& reason) { | 659 const base::Optional<DisconnectReason>& reason) { |
| 658 DCHECK(!IsMasterInterfaceId(id) || reason); | 660 DCHECK(!IsMasterInterfaceId(id) || reason); |
| 659 | 661 |
| 660 MayAutoLock locker(lock_.get()); | 662 MayAutoLock locker(&lock_); |
| 661 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); | 663 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| 662 | 664 |
| 663 if (reason) | 665 if (reason) |
| 664 endpoint->set_disconnect_reason(reason); | 666 endpoint->set_disconnect_reason(reason); |
| 665 | 667 |
| 666 // It is possible that this endpoint has been set as peer closed. That is | 668 // It is possible that this endpoint has been set as peer closed. That is |
| 667 // because when the message pipe is closed, all the endpoints are updated with | 669 // because when the message pipe is closed, all the endpoints are updated with |
| 668 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, | 670 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, |
| 669 // as long as there are refs keeping the router alive. If there is a | 671 // as long as there are refs keeping the router alive. If there is a |
| 670 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get | 672 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get |
| 671 // here and see that the endpoint has been marked as peer closed. | 673 // here and see that the endpoint has been marked as peer closed. |
| 672 if (!endpoint->peer_closed()) { | 674 if (!endpoint->peer_closed()) { |
| 673 if (endpoint->client()) | 675 if (endpoint->client()) |
| 674 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 676 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 675 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 677 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 676 } | 678 } |
| 677 | 679 |
| 678 // No need to trigger a ProcessTasks() because it is already on the stack. | 680 // No need to trigger a ProcessTasks() because it is already on the stack. |
| 679 | 681 |
| 680 return true; | 682 return true; |
| 681 } | 683 } |
| 682 | 684 |
| 683 void MultiplexRouter::OnPipeConnectionError() { | 685 void MultiplexRouter::OnPipeConnectionError() { |
| 684 DCHECK(thread_checker_.CalledOnValidThread()); | 686 DCHECK(thread_checker_.CalledOnValidThread()); |
| 685 | 687 |
| 686 scoped_refptr<MultiplexRouter> protector(this); | 688 scoped_refptr<MultiplexRouter> protector(this); |
| 687 MayAutoLock locker(lock_.get()); | 689 MayAutoLock locker(&lock_); |
| 688 | 690 |
| 689 encountered_error_ = true; | 691 encountered_error_ = true; |
| 690 | 692 |
| 691 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 693 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 692 InterfaceEndpoint* endpoint = iter->second.get(); | 694 InterfaceEndpoint* endpoint = iter->second.get(); |
| 693 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 695 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 694 // because it may remove the corresponding value from the map. | 696 // because it may remove the corresponding value from the map. |
| 695 ++iter; | 697 ++iter; |
| 696 | 698 |
| 697 if (endpoint->client()) | 699 if (endpoint->client()) |
| (...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 810 InterfaceEndpointClient* client = endpoint->client(); | 812 InterfaceEndpointClient* client = endpoint->client(); |
| 811 base::Optional<DisconnectReason> disconnect_reason( | 813 base::Optional<DisconnectReason> disconnect_reason( |
| 812 endpoint->disconnect_reason()); | 814 endpoint->disconnect_reason()); |
| 813 | 815 |
| 814 { | 816 { |
| 815 // We must unlock before calling into |client| because it may call this | 817 // We must unlock before calling into |client| because it may call this |
| 816 // object within NotifyError(). Holding the lock will lead to deadlock. | 818 // object within NotifyError(). Holding the lock will lead to deadlock. |
| 817 // | 819 // |
| 818 // It is safe to call into |client| without the lock. Because |client| is | 820 // It is safe to call into |client| without the lock. Because |client| is |
| 819 // always accessed on the same thread, including DetachEndpointClient(). | 821 // always accessed on the same thread, including DetachEndpointClient(). |
| 820 MayAutoUnlock unlocker(lock_.get()); | 822 MayAutoUnlock unlocker(&lock_); |
| 821 client->NotifyError(disconnect_reason); | 823 client->NotifyError(disconnect_reason); |
| 822 } | 824 } |
| 823 return true; | 825 return true; |
| 824 } | 826 } |
| 825 | 827 |
| 826 bool MultiplexRouter::ProcessIncomingMessage( | 828 bool MultiplexRouter::ProcessIncomingMessage( |
| 827 Message* message, | 829 Message* message, |
| 828 ClientCallBehavior client_call_behavior, | 830 ClientCallBehavior client_call_behavior, |
| 829 base::SingleThreadTaskRunner* current_task_runner) { | 831 base::SingleThreadTaskRunner* current_task_runner) { |
| 830 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 832 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 831 DCHECK(!paused_); | 833 DCHECK(!paused_); |
| 832 DCHECK(message); | 834 DCHECK(message); |
| 833 AssertLockAcquired(); | 835 AssertLockAcquired(); |
| 834 | 836 |
| 835 if (message->IsNull()) { | 837 if (message->IsNull()) { |
| 836 // This is a sync message and has been processed during sync handle | 838 // This is a sync message and has been processed during sync handle |
| 837 // watching. | 839 // watching. |
| 838 return true; | 840 return true; |
| 839 } | 841 } |
| 840 | 842 |
| 841 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 843 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 842 bool result = false; | 844 bool result = false; |
| 843 | 845 |
| 844 { | 846 { |
| 845 MayAutoUnlock unlocker(lock_.get()); | 847 MayAutoUnlock unlocker(&lock_); |
| 846 result = control_message_handler_.Accept(message); | 848 result = control_message_handler_.Accept(message); |
| 847 } | 849 } |
| 848 | 850 |
| 849 if (!result) | 851 if (!result) |
| 850 RaiseErrorInNonTestingMode(); | 852 RaiseErrorInNonTestingMode(); |
| 851 | 853 |
| 852 return true; | 854 return true; |
| 853 } | 855 } |
| 854 | 856 |
| 855 InterfaceId id = message->interface_id(); | 857 InterfaceId id = message->interface_id(); |
| (...skipping 27 matching lines...) Expand all Loading... |
| 883 | 885 |
| 884 InterfaceEndpointClient* client = endpoint->client(); | 886 InterfaceEndpointClient* client = endpoint->client(); |
| 885 bool result = false; | 887 bool result = false; |
| 886 { | 888 { |
| 887 // We must unlock before calling into |client| because it may call this | 889 // We must unlock before calling into |client| because it may call this |
| 888 // object within HandleIncomingMessage(). Holding the lock will lead to | 890 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 889 // deadlock. | 891 // deadlock. |
| 890 // | 892 // |
| 891 // It is safe to call into |client| without the lock. Because |client| is | 893 // It is safe to call into |client| without the lock. Because |client| is |
| 892 // always accessed on the same thread, including DetachEndpointClient(). | 894 // always accessed on the same thread, including DetachEndpointClient(). |
| 893 MayAutoUnlock unlocker(lock_.get()); | 895 MayAutoUnlock unlocker(&lock_); |
| 894 result = client->HandleIncomingMessage(message); | 896 result = client->HandleIncomingMessage(message); |
| 895 } | 897 } |
| 896 if (!result) | 898 if (!result) |
| 897 RaiseErrorInNonTestingMode(); | 899 RaiseErrorInNonTestingMode(); |
| 898 | 900 |
| 899 return true; | 901 return true; |
| 900 } | 902 } |
| 901 | 903 |
| 902 void MultiplexRouter::MaybePostToProcessTasks( | 904 void MultiplexRouter::MaybePostToProcessTasks( |
| 903 base::SingleThreadTaskRunner* task_runner) { | 905 base::SingleThreadTaskRunner* task_runner) { |
| 904 AssertLockAcquired(); | 906 AssertLockAcquired(); |
| 905 if (posted_to_process_tasks_) | 907 if (posted_to_process_tasks_) |
| 906 return; | 908 return; |
| 907 | 909 |
| 908 posted_to_process_tasks_ = true; | 910 posted_to_process_tasks_ = true; |
| 909 posted_to_task_runner_ = task_runner; | 911 posted_to_task_runner_ = task_runner; |
| 910 task_runner->PostTask( | 912 task_runner->PostTask( |
| 911 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 913 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 912 } | 914 } |
| 913 | 915 |
| 914 void MultiplexRouter::LockAndCallProcessTasks() { | 916 void MultiplexRouter::LockAndCallProcessTasks() { |
| 915 // There is no need to hold a ref to this class in this case because this is | 917 // There is no need to hold a ref to this class in this case because this is |
| 916 // always called using base::Bind(), which holds a ref. | 918 // always called using base::Bind(), which holds a ref. |
| 917 MayAutoLock locker(lock_.get()); | 919 MayAutoLock locker(&lock_); |
| 918 posted_to_process_tasks_ = false; | 920 posted_to_process_tasks_ = false; |
| 919 scoped_refptr<base::SingleThreadTaskRunner> runner( | 921 scoped_refptr<base::SingleThreadTaskRunner> runner( |
| 920 std::move(posted_to_task_runner_)); | 922 std::move(posted_to_task_runner_)); |
| 921 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); | 923 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); |
| 922 } | 924 } |
| 923 | 925 |
| 924 void MultiplexRouter::UpdateEndpointStateMayRemove( | 926 void MultiplexRouter::UpdateEndpointStateMayRemove( |
| 925 InterfaceEndpoint* endpoint, | 927 InterfaceEndpoint* endpoint, |
| 926 EndpointStateUpdateType type) { | 928 EndpointStateUpdateType type) { |
| 927 switch (type) { | 929 switch (type) { |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 973 | 975 |
| 974 void MultiplexRouter::AssertLockAcquired() { | 976 void MultiplexRouter::AssertLockAcquired() { |
| 975 #if DCHECK_IS_ON() | 977 #if DCHECK_IS_ON() |
| 976 if (lock_) | 978 if (lock_) |
| 977 lock_->AssertAcquired(); | 979 lock_->AssertAcquired(); |
| 978 #endif | 980 #endif |
| 979 } | 981 } |
| 980 | 982 |
| 981 } // namespace internal | 983 } // namespace internal |
| 982 } // namespace mojo | 984 } // namespace mojo |
| OLD | NEW |