| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 104 } | 104 } |
| 105 | 105 |
| 106 // --------------------------------------------------------------------------- | 106 // --------------------------------------------------------------------------- |
| 107 // The following public methods (i.e., InterfaceEndpointController | 107 // The following public methods (i.e., InterfaceEndpointController |
| 108 // implementation) are called by the client on the same thread as the | 108 // implementation) are called by the client on the same thread as the |
| 109 // AttachClient() call. They are called outside of the router's lock. | 109 // AttachClient() call. They are called outside of the router's lock. |
| 110 | 110 |
| 111 bool SendMessage(Message* message) override { | 111 bool SendMessage(Message* message) override { |
| 112 DCHECK(task_runner_->BelongsToCurrentThread()); | 112 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 113 message->set_interface_id(id_); | 113 message->set_interface_id(id_); |
| 114 return router_->connector_.Accept(message); | 114 |
| 115 return router_->connector_.Accept(message).Succeeded(); |
| 115 } | 116 } |
| 116 | 117 |
| 117 void AllowWokenUpBySyncWatchOnSameThread() override { | 118 void AllowWokenUpBySyncWatchOnSameThread() override { |
| 118 DCHECK(task_runner_->BelongsToCurrentThread()); | 119 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 119 | 120 |
| 120 EnsureSyncWatcherExists(); | 121 EnsureSyncWatcherExists(); |
| 121 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 122 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 122 } | 123 } |
| 123 | 124 |
| 124 bool SyncWatch(const bool* should_stop) override { | 125 bool SyncWatch(const bool* should_stop) override { |
| (...skipping 298 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 423 | 424 |
| 424 DCHECK(IsValidInterfaceId(id)); | 425 DCHECK(IsValidInterfaceId(id)); |
| 425 | 426 |
| 426 base::AutoLock locker(lock_); | 427 base::AutoLock locker(lock_); |
| 427 DCHECK(ContainsKey(endpoints_, id)); | 428 DCHECK(ContainsKey(endpoints_, id)); |
| 428 | 429 |
| 429 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 430 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 430 endpoint->DetachClient(); | 431 endpoint->DetachClient(); |
| 431 } | 432 } |
| 432 | 433 |
| 433 void MultiplexRouter::RaiseError() { | 434 void MultiplexRouter::RaiseError(Result error) { |
| 434 if (task_runner_->BelongsToCurrentThread()) { | 435 if (task_runner_->BelongsToCurrentThread()) { |
| 435 connector_.RaiseError(); | 436 connector_.RaiseError(std::move(error)); |
| 436 } else { | 437 } else { |
| 437 task_runner_->PostTask(FROM_HERE, | 438 task_runner_->PostTask(FROM_HERE, |
| 438 base::Bind(&MultiplexRouter::RaiseError, this)); | 439 base::Bind(&MultiplexRouter::RaiseError, this, |
| 440 base::Passed(&error))); |
| 439 } | 441 } |
| 440 } | 442 } |
| 441 | 443 |
| 442 std::unique_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { | 444 std::unique_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { |
| 443 std::unique_ptr<AssociatedGroup> group(new AssociatedGroup); | 445 std::unique_ptr<AssociatedGroup> group(new AssociatedGroup); |
| 444 group->router_ = this; | 446 group->router_ = this; |
| 445 return group; | 447 return group; |
| 446 } | 448 } |
| 447 | 449 |
| 448 // static | 450 // static |
| (...skipping 15 matching lines...) Expand all Loading... |
| 464 base::AutoLock locker(lock_); | 466 base::AutoLock locker(lock_); |
| 465 | 467 |
| 466 if (endpoints_.size() > 1) | 468 if (endpoints_.size() > 1) |
| 467 return true; | 469 return true; |
| 468 if (endpoints_.size() == 0) | 470 if (endpoints_.size() == 0) |
| 469 return false; | 471 return false; |
| 470 | 472 |
| 471 return !ContainsKey(endpoints_, kMasterInterfaceId); | 473 return !ContainsKey(endpoints_, kMasterInterfaceId); |
| 472 } | 474 } |
| 473 | 475 |
| 476 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) { |
| 477 DCHECK(thread_checker_.CalledOnValidThread()); |
| 478 control_message_handler_.set_interface_name(name); |
| 479 header_validator_.set_debug_info(name + " master interface header validator"); |
| 480 } |
| 481 |
| 474 void MultiplexRouter::EnableTestingMode() { | 482 void MultiplexRouter::EnableTestingMode() { |
| 475 DCHECK(thread_checker_.CalledOnValidThread()); | 483 DCHECK(thread_checker_.CalledOnValidThread()); |
| 476 base::AutoLock locker(lock_); | 484 base::AutoLock locker(lock_); |
| 477 | 485 |
| 478 testing_mode_ = true; | 486 testing_mode_ = true; |
| 479 connector_.set_enforce_errors_from_incoming_receiver(false); | 487 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 480 } | 488 } |
| 481 | 489 |
| 482 bool MultiplexRouter::Accept(Message* message) { | 490 MessageReceiver::Result MultiplexRouter::Accept(Message* message) { |
| 483 DCHECK(thread_checker_.CalledOnValidThread()); | 491 DCHECK(thread_checker_.CalledOnValidThread()); |
| 484 | 492 |
| 485 scoped_refptr<MultiplexRouter> protector(this); | 493 scoped_refptr<MultiplexRouter> protector(this); |
| 486 base::AutoLock locker(lock_); | 494 base::AutoLock locker(lock_); |
| 487 | 495 |
| 488 ClientCallBehavior client_call_behavior = | 496 ClientCallBehavior client_call_behavior = |
| 489 connector_.during_sync_handle_watcher_callback() | 497 connector_.during_sync_handle_watcher_callback() |
| 490 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 498 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 491 : ALLOW_DIRECT_CLIENT_CALLS; | 499 : ALLOW_DIRECT_CLIENT_CALLS; |
| 492 | 500 |
| (...skipping 14 matching lines...) Expand all Loading... |
| 507 if (iter != endpoints_.end()) | 515 if (iter != endpoints_.end()) |
| 508 iter->second->SignalSyncMessageEvent(); | 516 iter->second->SignalSyncMessageEvent(); |
| 509 } | 517 } |
| 510 } else if (!tasks_.empty()) { | 518 } else if (!tasks_.empty()) { |
| 511 // Processing the message may result in new tasks (for error notification) | 519 // Processing the message may result in new tasks (for error notification) |
| 512 // being added to the queue. In this case, we have to attempt to process the | 520 // being added to the queue. In this case, we have to attempt to process the |
| 513 // tasks. | 521 // tasks. |
| 514 ProcessTasks(client_call_behavior, connector_.task_runner()); | 522 ProcessTasks(client_call_behavior, connector_.task_runner()); |
| 515 } | 523 } |
| 516 | 524 |
| 517 // Always return true. If we see errors during message processing, we will | 525 // Always succeed. If we see errors during message processing, we will |
| 518 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 526 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| 519 return true; | 527 return Result::ForSuccess(); |
| 520 } | 528 } |
| 521 | 529 |
| 522 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | 530 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
| 523 lock_.AssertAcquired(); | 531 lock_.AssertAcquired(); |
| 524 | 532 |
| 525 if (IsMasterInterfaceId(id)) | 533 if (IsMasterInterfaceId(id)) |
| 526 return false; | 534 return false; |
| 527 | 535 |
| 528 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); | 536 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| 529 | 537 |
| (...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 699 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 707 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 700 lock_.AssertAcquired(); | 708 lock_.AssertAcquired(); |
| 701 | 709 |
| 702 if (!message) { | 710 if (!message) { |
| 703 // This is a sync message and has been processed during sync handle | 711 // This is a sync message and has been processed during sync handle |
| 704 // watching. | 712 // watching. |
| 705 return true; | 713 return true; |
| 706 } | 714 } |
| 707 | 715 |
| 708 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 716 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 709 if (!control_message_handler_.Accept(message)) | 717 Result result = control_message_handler_.Accept(message); |
| 710 RaiseErrorInNonTestingMode(); | 718 if (!result.Succeeded()) |
| 719 RaiseErrorInNonTestingMode(std::move(result)); |
| 711 return true; | 720 return true; |
| 712 } | 721 } |
| 713 | 722 |
| 714 InterfaceId id = message->interface_id(); | 723 InterfaceId id = message->interface_id(); |
| 715 DCHECK(IsValidInterfaceId(id)); | 724 DCHECK(IsValidInterfaceId(id)); |
| 716 | 725 |
| 717 bool inserted = false; | 726 bool inserted = false; |
| 718 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 727 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 719 if (inserted) { | 728 if (inserted) { |
| 720 // Currently, it is legitimate to receive messages for an endpoint | 729 // Currently, it is legitimate to receive messages for an endpoint |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 756 } | 765 } |
| 757 | 766 |
| 758 if (!can_direct_call) { | 767 if (!can_direct_call) { |
| 759 MaybePostToProcessTasks(endpoint->task_runner()); | 768 MaybePostToProcessTasks(endpoint->task_runner()); |
| 760 return false; | 769 return false; |
| 761 } | 770 } |
| 762 | 771 |
| 763 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 772 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 764 | 773 |
| 765 InterfaceEndpointClient* client = endpoint->client(); | 774 InterfaceEndpointClient* client = endpoint->client(); |
| 766 bool result = false; | 775 Result result(Result::Type::UNKNOWN_ERROR); |
| 767 { | 776 { |
| 768 // We must unlock before calling into |client| because it may call this | 777 // We must unlock before calling into |client| because it may call this |
| 769 // object within HandleIncomingMessage(). Holding the lock will lead to | 778 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 770 // deadlock. | 779 // deadlock. |
| 771 // | 780 // |
| 772 // It is safe to call into |client| without the lock. Because |client| is | 781 // It is safe to call into |client| without the lock. Because |client| is |
| 773 // always accessed on the same thread, including DetachEndpointClient(). | 782 // always accessed on the same thread, including DetachEndpointClient(). |
| 774 base::AutoUnlock unlocker(lock_); | 783 base::AutoUnlock unlocker(lock_); |
| 775 result = client->HandleIncomingMessage(message); | 784 result = client->HandleIncomingMessage(message); |
| 776 } | 785 } |
| 777 if (!result) | 786 if (!result.Succeeded()) |
| 778 RaiseErrorInNonTestingMode(); | 787 RaiseErrorInNonTestingMode(std::move(result)); |
| 779 | 788 |
| 780 return true; | 789 return true; |
| 781 } | 790 } |
| 782 | 791 |
| 783 void MultiplexRouter::MaybePostToProcessTasks( | 792 void MultiplexRouter::MaybePostToProcessTasks( |
| 784 base::SingleThreadTaskRunner* task_runner) { | 793 base::SingleThreadTaskRunner* task_runner) { |
| 785 lock_.AssertAcquired(); | 794 lock_.AssertAcquired(); |
| 786 if (posted_to_process_tasks_) | 795 if (posted_to_process_tasks_) |
| 787 return; | 796 return; |
| 788 | 797 |
| (...skipping 24 matching lines...) Expand all Loading... |
| 813 endpoint->set_peer_closed(); | 822 endpoint->set_peer_closed(); |
| 814 // If the interface endpoint is performing a sync watch, this makes sure | 823 // If the interface endpoint is performing a sync watch, this makes sure |
| 815 // it is notified and eventually exits the sync watch. | 824 // it is notified and eventually exits the sync watch. |
| 816 endpoint->SignalSyncMessageEvent(); | 825 endpoint->SignalSyncMessageEvent(); |
| 817 break; | 826 break; |
| 818 } | 827 } |
| 819 if (endpoint->closed() && endpoint->peer_closed()) | 828 if (endpoint->closed() && endpoint->peer_closed()) |
| 820 endpoints_.erase(endpoint->id()); | 829 endpoints_.erase(endpoint->id()); |
| 821 } | 830 } |
| 822 | 831 |
| 823 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 832 void MultiplexRouter::RaiseErrorInNonTestingMode(Result error) { |
| 824 lock_.AssertAcquired(); | 833 lock_.AssertAcquired(); |
| 825 if (!testing_mode_) | 834 if (!testing_mode_) |
| 826 RaiseError(); | 835 RaiseError(std::move(error)); |
| 827 } | 836 } |
| 828 | 837 |
| 829 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( | 838 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( |
| 830 InterfaceId id, | 839 InterfaceId id, |
| 831 bool* inserted) { | 840 bool* inserted) { |
| 832 lock_.AssertAcquired(); | 841 lock_.AssertAcquired(); |
| 833 // Either |inserted| is nullptr or it points to a boolean initialized as | 842 // Either |inserted| is nullptr or it points to a boolean initialized as |
| 834 // false. | 843 // false. |
| 835 DCHECK(!inserted || !*inserted); | 844 DCHECK(!inserted || !*inserted); |
| 836 | 845 |
| 837 auto iter = endpoints_.find(id); | 846 auto iter = endpoints_.find(id); |
| 838 InterfaceEndpoint* endpoint; | 847 InterfaceEndpoint* endpoint; |
| 839 if (iter == endpoints_.end()) { | 848 if (iter == endpoints_.end()) { |
| 840 endpoint = new InterfaceEndpoint(this, id); | 849 endpoint = new InterfaceEndpoint(this, id); |
| 841 endpoints_[id] = endpoint; | 850 endpoints_[id] = endpoint; |
| 842 if (inserted) | 851 if (inserted) |
| 843 *inserted = true; | 852 *inserted = true; |
| 844 } else { | 853 } else { |
| 845 endpoint = iter->second.get(); | 854 endpoint = iter->second.get(); |
| 846 } | 855 } |
| 847 | 856 |
| 848 return endpoint; | 857 return endpoint; |
| 849 } | 858 } |
| 850 | 859 |
| 851 } // namespace internal | 860 } // namespace internal |
| 852 } // namespace mojo | 861 } // namespace mojo |
| OLD | NEW |