Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(226)

Side by Side Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2301123004: Mojo Channel: Fix deferred proxy dispatch; support paused channels (Closed)
Patch Set: . Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_mojo_bootstrap.h ('k') | ipc/ipc_sync_channel.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_mojo_bootstrap.h" 5 #include "ipc/ipc_mojo_bootstrap.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <map> 9 #include <map>
10 #include <memory> 10 #include <memory>
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
63 DCHECK(thread_checker_.CalledOnValidThread()); 63 DCHECK(thread_checker_.CalledOnValidThread());
64 DCHECK(task_runner_->BelongsToCurrentThread()); 64 DCHECK(task_runner_->BelongsToCurrentThread());
65 65
66 connector_.reset(new mojo::Connector( 66 connector_.reset(new mojo::Connector(
67 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, 67 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
68 task_runner_)); 68 task_runner_));
69 connector_->set_incoming_receiver(&filters_); 69 connector_->set_incoming_receiver(&filters_);
70 connector_->set_connection_error_handler( 70 connector_->set_connection_error_handler(
71 base::Bind(&ChannelAssociatedGroupController::OnPipeError, 71 base::Bind(&ChannelAssociatedGroupController::OnPipeError,
72 base::Unretained(this))); 72 base::Unretained(this)));
73 }
73 74
75 void Start() {
76 DCHECK(!started_);
77 started_ = true;
78 }
79
80 void FlushOutgoingMessages() {
74 std::vector<mojo::Message> outgoing_messages; 81 std::vector<mojo::Message> outgoing_messages;
75 std::swap(outgoing_messages, outgoing_messages_); 82 std::swap(outgoing_messages, outgoing_messages_);
76 for (auto& message : outgoing_messages) 83 for (auto& message : outgoing_messages)
77 SendMessage(&message); 84 SendMessage(&message);
78 } 85 }
79 86
80 void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender, 87 void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender,
81 mojom::ChannelAssociatedRequest* receiver) { 88 mojom::ChannelAssociatedRequest* receiver) {
82 mojo::InterfaceId sender_id, receiver_id; 89 mojo::InterfaceId sender_id, receiver_id;
83 if (set_interface_id_namespace_bit_) { 90 if (set_interface_id_namespace_bit_) {
(...skipping 375 matching lines...) Expand 10 before | Expand all | Expand 10 after
459 DCHECK(endpoint->closed()); 466 DCHECK(endpoint->closed());
460 MarkPeerClosedAndMaybeRemove(endpoint); 467 MarkPeerClosedAndMaybeRemove(endpoint);
461 } 468 }
462 469
463 DCHECK(endpoints_.empty()); 470 DCHECK(endpoints_.empty());
464 } 471 }
465 472
466 bool SendMessage(mojo::Message* message) { 473 bool SendMessage(mojo::Message* message) {
467 if (task_runner_->BelongsToCurrentThread()) { 474 if (task_runner_->BelongsToCurrentThread()) {
468 DCHECK(thread_checker_.CalledOnValidThread()); 475 DCHECK(thread_checker_.CalledOnValidThread());
469 if (!connector_) { 476 if (!connector_ || !started_) {
470 // Pipe may not be bound yet, so we queue the message. 477 // Pipe may not be bound yet or the channel may still be paused, so we
478 // queue the message.
471 outgoing_messages_.emplace_back(std::move(*message)); 479 outgoing_messages_.emplace_back(std::move(*message));
472 return true; 480 return true;
473 } 481 }
474 return connector_->Accept(message); 482 return connector_->Accept(message);
475 } else { 483 } else {
476 // We always post tasks to the master endpoint thread when called from the 484 // We always post tasks to the master endpoint thread when called from the
477 // proxy thread in order to simulate IPC::ChannelProxy::Send behavior. 485 // proxy thread in order to simulate IPC::ChannelProxy::Send behavior.
478 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 486 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
479 task_runner_->PostTask( 487 task_runner_->PostTask(
480 FROM_HERE, 488 FROM_HERE,
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
584 bool Accept(mojo::Message* message) override { 592 bool Accept(mojo::Message* message) override {
585 DCHECK(thread_checker_.CalledOnValidThread()); 593 DCHECK(thread_checker_.CalledOnValidThread());
586 594
587 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) 595 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
588 return control_message_handler_.Accept(message); 596 return control_message_handler_.Accept(message);
589 597
590 mojo::InterfaceId id = message->interface_id(); 598 mojo::InterfaceId id = message->interface_id();
591 DCHECK(mojo::IsValidInterfaceId(id)); 599 DCHECK(mojo::IsValidInterfaceId(id));
592 600
593 base::AutoLock locker(lock_); 601 base::AutoLock locker(lock_);
594 Endpoint* endpoint = GetEndpointForDispatch(id); 602 Endpoint* endpoint =
595 if (!endpoint) 603 GetEndpointForDispatch(id, false /* close_on_insert */);
596 return true; 604 mojo::InterfaceEndpointClient* client =
597 605 endpoint ? endpoint->client() : nullptr;
598 mojo::InterfaceEndpointClient* client = endpoint->client();
599 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { 606 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) {
600 // No client has been bound yet or the client runs tasks on another 607 // No client has been bound yet or the client runs tasks on another
601 // thread. We assume the other thread must always be the one on which 608 // thread. We assume the other thread must always be the one on which
602 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. 609 // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
603 // 610 //
604 // If the client is not yet bound, it must be bound by the time this task 611 // If the client is not yet bound, it must be bound by the time this task
605 // runs or else it's programmer error. 612 // runs or else it's programmer error.
606 DCHECK(proxy_task_runner_); 613 DCHECK(proxy_task_runner_);
607 614
608 if (message->has_flag(mojo::Message::kFlagIsSync)) { 615 if (message->has_flag(mojo::Message::kFlagIsSync)) {
(...skipping 26 matching lines...) Expand all
635 return client->HandleIncomingMessage(message); 642 return client->HandleIncomingMessage(message);
636 } 643 }
637 644
638 void AcceptOnProxyThread(mojo::Message message) { 645 void AcceptOnProxyThread(mojo::Message message) {
639 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 646 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
640 647
641 mojo::InterfaceId id = message.interface_id(); 648 mojo::InterfaceId id = message.interface_id();
642 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); 649 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
643 650
644 base::AutoLock locker(lock_); 651 base::AutoLock locker(lock_);
645 Endpoint* endpoint = GetEndpointForDispatch(id); 652 Endpoint* endpoint = GetEndpointForDispatch(id, true /* close_on_insert */);
646 if (!endpoint) 653 if (!endpoint)
647 return; 654 return;
648 655
649 mojo::InterfaceEndpointClient* client = endpoint->client(); 656 mojo::InterfaceEndpointClient* client = endpoint->client();
650 if (!client) 657 if (!client)
651 return; 658 return;
652 659
653 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 660 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
654 661
655 // Sync messages should never make their way to this method. 662 // Sync messages should never make their way to this method.
656 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); 663 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
657 664
658 bool result = false; 665 bool result = false;
659 { 666 {
660 base::AutoUnlock unlocker(lock_); 667 base::AutoUnlock unlocker(lock_);
661 result = client->HandleIncomingMessage(&message); 668 result = client->HandleIncomingMessage(&message);
662 } 669 }
663 670
664 if (!result) 671 if (!result)
665 RaiseError(); 672 RaiseError();
666 } 673 }
667 674
668 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { 675 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
669 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 676 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
670 677
671 base::AutoLock locker(lock_); 678 base::AutoLock locker(lock_);
672 Endpoint* endpoint = GetEndpointForDispatch(interface_id); 679 Endpoint* endpoint =
680 GetEndpointForDispatch(interface_id, true /* close_on_insert */);
673 if (!endpoint) 681 if (!endpoint)
674 return; 682 return;
675 683
676 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 684 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
677 mojo::Message message = endpoint->PopSyncMessage(message_id); 685 mojo::Message message = endpoint->PopSyncMessage(message_id);
678 686
679 // The message must have already been dequeued by the endpoint waking up 687 // The message must have already been dequeued by the endpoint waking up
680 // from a sync wait. Nothing to do. 688 // from a sync wait. Nothing to do.
681 if (message.IsNull()) 689 if (message.IsNull())
682 return; 690 return;
683 691
684 mojo::InterfaceEndpointClient* client = endpoint->client(); 692 mojo::InterfaceEndpointClient* client = endpoint->client();
685 if (!client) 693 if (!client)
686 return; 694 return;
687 695
688 bool result = false; 696 bool result = false;
689 { 697 {
690 base::AutoUnlock unlocker(lock_); 698 base::AutoUnlock unlocker(lock_);
691 result = client->HandleIncomingMessage(&message); 699 result = client->HandleIncomingMessage(&message);
692 } 700 }
693 701
694 if (!result) 702 if (!result)
695 RaiseError(); 703 RaiseError();
696 } 704 }
697 705
698 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { 706 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id, bool close_on_insert) {
699 lock_.AssertAcquired(); 707 lock_.AssertAcquired();
700 bool inserted = false; 708 bool inserted = false;
701 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 709 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
702 if (inserted) { 710 if (inserted && close_on_insert) {
703 MarkClosedAndMaybeRemove(endpoint); 711 MarkClosedAndMaybeRemove(endpoint);
704 if (!mojo::IsMasterInterfaceId(id)) 712 if (!mojo::IsMasterInterfaceId(id))
705 control_message_proxy_.NotifyPeerEndpointClosed(id); 713 control_message_proxy_.NotifyPeerEndpointClosed(id);
706 return nullptr; 714 return nullptr;
707 } 715 }
708 716
709 if (endpoint->closed()) 717 if (endpoint->closed())
710 return nullptr; 718 return nullptr;
711 719
712 return endpoint; 720 return endpoint;
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
745 return true; 753 return true;
746 } 754 }
747 755
748 // Checked in places which must be run on the master endpoint's thread. 756 // Checked in places which must be run on the master endpoint's thread.
749 base::ThreadChecker thread_checker_; 757 base::ThreadChecker thread_checker_;
750 758
751 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 759 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
752 760
753 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; 761 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
754 const bool set_interface_id_namespace_bit_; 762 const bool set_interface_id_namespace_bit_;
763 bool started_ = false;
755 std::unique_ptr<mojo::Connector> connector_; 764 std::unique_ptr<mojo::Connector> connector_;
756 mojo::FilterChain filters_; 765 mojo::FilterChain filters_;
757 mojo::PipeControlMessageHandler control_message_handler_; 766 mojo::PipeControlMessageHandler control_message_handler_;
758 ControlMessageProxyThunk control_message_proxy_thunk_; 767 ControlMessageProxyThunk control_message_proxy_thunk_;
759 mojo::PipeControlMessageProxy control_message_proxy_; 768 mojo::PipeControlMessageProxy control_message_proxy_;
760 769
761 // Outgoing messages that were sent before this controller was bound to a 770 // Outgoing messages that were sent before this controller was bound to a
762 // real message pipe. 771 // real message pipe.
763 std::vector<mojo::Message> outgoing_messages_; 772 std::vector<mojo::Message> outgoing_messages_;
764 773
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
796 void Connect() override { 805 void Connect() override {
797 controller_->Bind(std::move(handle_)); 806 controller_->Bind(std::move(handle_));
798 807
799 IPC::mojom::ChannelAssociatedPtr sender; 808 IPC::mojom::ChannelAssociatedPtr sender;
800 IPC::mojom::ChannelAssociatedRequest receiver; 809 IPC::mojom::ChannelAssociatedRequest receiver;
801 controller_->CreateChannelEndpoints(&sender, &receiver); 810 controller_->CreateChannelEndpoints(&sender, &receiver);
802 811
803 delegate_->OnPipesAvailable(std::move(sender), std::move(receiver)); 812 delegate_->OnPipesAvailable(std::move(sender), std::move(receiver));
804 } 813 }
805 814
815 void Start() override {
816 controller_->Start();
817 }
818
819 void Flush() override {
820 controller_->FlushOutgoingMessages();
821 }
822
806 mojo::AssociatedGroup* GetAssociatedGroup() override { 823 mojo::AssociatedGroup* GetAssociatedGroup() override {
807 return associated_group_.get(); 824 return associated_group_.get();
808 } 825 }
809 826
810 scoped_refptr<ChannelAssociatedGroupController> controller_; 827 scoped_refptr<ChannelAssociatedGroupController> controller_;
811 828
812 mojo::ScopedMessagePipeHandle handle_; 829 mojo::ScopedMessagePipeHandle handle_;
813 Delegate* delegate_; 830 Delegate* delegate_;
814 std::unique_ptr<mojo::AssociatedGroup> associated_group_; 831 std::unique_ptr<mojo::AssociatedGroup> associated_group_;
815 832
816 DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl); 833 DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl);
817 }; 834 };
818 835
819 } // namespace 836 } // namespace
820 837
821 // static 838 // static
822 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( 839 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
823 mojo::ScopedMessagePipeHandle handle, 840 mojo::ScopedMessagePipeHandle handle,
824 Channel::Mode mode, 841 Channel::Mode mode,
825 Delegate* delegate, 842 Delegate* delegate,
826 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { 843 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
827 return base::MakeUnique<MojoBootstrapImpl>( 844 return base::MakeUnique<MojoBootstrapImpl>(
828 std::move(handle), delegate, 845 std::move(handle), delegate,
829 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, 846 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
830 ipc_task_runner)); 847 ipc_task_runner));
831 } 848 }
832 849
833 } // namespace IPC 850 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_mojo_bootstrap.h ('k') | ipc/ipc_sync_channel.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698