OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <map> | 9 #include <map> |
10 #include <memory> | 10 #include <memory> |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
63 DCHECK(thread_checker_.CalledOnValidThread()); | 63 DCHECK(thread_checker_.CalledOnValidThread()); |
64 DCHECK(task_runner_->BelongsToCurrentThread()); | 64 DCHECK(task_runner_->BelongsToCurrentThread()); |
65 | 65 |
66 connector_.reset(new mojo::Connector( | 66 connector_.reset(new mojo::Connector( |
67 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, | 67 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, |
68 task_runner_)); | 68 task_runner_)); |
69 connector_->set_incoming_receiver(&filters_); | 69 connector_->set_incoming_receiver(&filters_); |
70 connector_->set_connection_error_handler( | 70 connector_->set_connection_error_handler( |
71 base::Bind(&ChannelAssociatedGroupController::OnPipeError, | 71 base::Bind(&ChannelAssociatedGroupController::OnPipeError, |
72 base::Unretained(this))); | 72 base::Unretained(this))); |
| 73 } |
73 | 74 |
| 75 void Start() { |
| 76 DCHECK(!started_); |
| 77 started_ = true; |
| 78 } |
| 79 |
| 80 void FlushOutgoingMessages() { |
74 std::vector<mojo::Message> outgoing_messages; | 81 std::vector<mojo::Message> outgoing_messages; |
75 std::swap(outgoing_messages, outgoing_messages_); | 82 std::swap(outgoing_messages, outgoing_messages_); |
76 for (auto& message : outgoing_messages) | 83 for (auto& message : outgoing_messages) |
77 SendMessage(&message); | 84 SendMessage(&message); |
78 } | 85 } |
79 | 86 |
80 void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender, | 87 void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender, |
81 mojom::ChannelAssociatedRequest* receiver) { | 88 mojom::ChannelAssociatedRequest* receiver) { |
82 mojo::InterfaceId sender_id, receiver_id; | 89 mojo::InterfaceId sender_id, receiver_id; |
83 if (set_interface_id_namespace_bit_) { | 90 if (set_interface_id_namespace_bit_) { |
(...skipping 375 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
459 DCHECK(endpoint->closed()); | 466 DCHECK(endpoint->closed()); |
460 MarkPeerClosedAndMaybeRemove(endpoint); | 467 MarkPeerClosedAndMaybeRemove(endpoint); |
461 } | 468 } |
462 | 469 |
463 DCHECK(endpoints_.empty()); | 470 DCHECK(endpoints_.empty()); |
464 } | 471 } |
465 | 472 |
466 bool SendMessage(mojo::Message* message) { | 473 bool SendMessage(mojo::Message* message) { |
467 if (task_runner_->BelongsToCurrentThread()) { | 474 if (task_runner_->BelongsToCurrentThread()) { |
468 DCHECK(thread_checker_.CalledOnValidThread()); | 475 DCHECK(thread_checker_.CalledOnValidThread()); |
469 if (!connector_) { | 476 if (!connector_ || !started_) { |
470 // Pipe may not be bound yet, so we queue the message. | 477 // Pipe may not be bound yet or the channel may still be paused, so we |
| 478 // queue the message. |
471 outgoing_messages_.emplace_back(std::move(*message)); | 479 outgoing_messages_.emplace_back(std::move(*message)); |
472 return true; | 480 return true; |
473 } | 481 } |
474 return connector_->Accept(message); | 482 return connector_->Accept(message); |
475 } else { | 483 } else { |
476 // We always post tasks to the master endpoint thread when called from the | 484 // We always post tasks to the master endpoint thread when called from the |
477 // proxy thread in order to simulate IPC::ChannelProxy::Send behavior. | 485 // proxy thread in order to simulate IPC::ChannelProxy::Send behavior. |
478 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 486 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
479 task_runner_->PostTask( | 487 task_runner_->PostTask( |
480 FROM_HERE, | 488 FROM_HERE, |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
584 bool Accept(mojo::Message* message) override { | 592 bool Accept(mojo::Message* message) override { |
585 DCHECK(thread_checker_.CalledOnValidThread()); | 593 DCHECK(thread_checker_.CalledOnValidThread()); |
586 | 594 |
587 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) | 595 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) |
588 return control_message_handler_.Accept(message); | 596 return control_message_handler_.Accept(message); |
589 | 597 |
590 mojo::InterfaceId id = message->interface_id(); | 598 mojo::InterfaceId id = message->interface_id(); |
591 DCHECK(mojo::IsValidInterfaceId(id)); | 599 DCHECK(mojo::IsValidInterfaceId(id)); |
592 | 600 |
593 base::AutoLock locker(lock_); | 601 base::AutoLock locker(lock_); |
594 Endpoint* endpoint = GetEndpointForDispatch(id); | 602 Endpoint* endpoint = |
595 if (!endpoint) | 603 GetEndpointForDispatch(id, false /* close_on_insert */); |
596 return true; | 604 mojo::InterfaceEndpointClient* client = |
597 | 605 endpoint ? endpoint->client() : nullptr; |
598 mojo::InterfaceEndpointClient* client = endpoint->client(); | |
599 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { | 606 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { |
600 // No client has been bound yet or the client runs tasks on another | 607 // No client has been bound yet or the client runs tasks on another |
601 // thread. We assume the other thread must always be the one on which | 608 // thread. We assume the other thread must always be the one on which |
602 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. | 609 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. |
603 // | 610 // |
604 // If the client is not yet bound, it must be bound by the time this task | 611 // If the client is not yet bound, it must be bound by the time this task |
605 // runs or else it's programmer error. | 612 // runs or else it's programmer error. |
606 DCHECK(proxy_task_runner_); | 613 DCHECK(proxy_task_runner_); |
607 | 614 |
608 if (message->has_flag(mojo::Message::kFlagIsSync)) { | 615 if (message->has_flag(mojo::Message::kFlagIsSync)) { |
(...skipping 26 matching lines...) Expand all Loading... |
635 return client->HandleIncomingMessage(message); | 642 return client->HandleIncomingMessage(message); |
636 } | 643 } |
637 | 644 |
638 void AcceptOnProxyThread(mojo::Message message) { | 645 void AcceptOnProxyThread(mojo::Message message) { |
639 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 646 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
640 | 647 |
641 mojo::InterfaceId id = message.interface_id(); | 648 mojo::InterfaceId id = message.interface_id(); |
642 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); | 649 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
643 | 650 |
644 base::AutoLock locker(lock_); | 651 base::AutoLock locker(lock_); |
645 Endpoint* endpoint = GetEndpointForDispatch(id); | 652 Endpoint* endpoint = GetEndpointForDispatch(id, true /* close_on_insert */); |
646 if (!endpoint) | 653 if (!endpoint) |
647 return; | 654 return; |
648 | 655 |
649 mojo::InterfaceEndpointClient* client = endpoint->client(); | 656 mojo::InterfaceEndpointClient* client = endpoint->client(); |
650 if (!client) | 657 if (!client) |
651 return; | 658 return; |
652 | 659 |
653 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 660 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
654 | 661 |
655 // Sync messages should never make their way to this method. | 662 // Sync messages should never make their way to this method. |
656 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); | 663 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); |
657 | 664 |
658 bool result = false; | 665 bool result = false; |
659 { | 666 { |
660 base::AutoUnlock unlocker(lock_); | 667 base::AutoUnlock unlocker(lock_); |
661 result = client->HandleIncomingMessage(&message); | 668 result = client->HandleIncomingMessage(&message); |
662 } | 669 } |
663 | 670 |
664 if (!result) | 671 if (!result) |
665 RaiseError(); | 672 RaiseError(); |
666 } | 673 } |
667 | 674 |
668 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { | 675 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { |
669 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 676 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
670 | 677 |
671 base::AutoLock locker(lock_); | 678 base::AutoLock locker(lock_); |
672 Endpoint* endpoint = GetEndpointForDispatch(interface_id); | 679 Endpoint* endpoint = |
| 680 GetEndpointForDispatch(interface_id, true /* close_on_insert */); |
673 if (!endpoint) | 681 if (!endpoint) |
674 return; | 682 return; |
675 | 683 |
676 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 684 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
677 mojo::Message message = endpoint->PopSyncMessage(message_id); | 685 mojo::Message message = endpoint->PopSyncMessage(message_id); |
678 | 686 |
679 // The message must have already been dequeued by the endpoint waking up | 687 // The message must have already been dequeued by the endpoint waking up |
680 // from a sync wait. Nothing to do. | 688 // from a sync wait. Nothing to do. |
681 if (message.IsNull()) | 689 if (message.IsNull()) |
682 return; | 690 return; |
683 | 691 |
684 mojo::InterfaceEndpointClient* client = endpoint->client(); | 692 mojo::InterfaceEndpointClient* client = endpoint->client(); |
685 if (!client) | 693 if (!client) |
686 return; | 694 return; |
687 | 695 |
688 bool result = false; | 696 bool result = false; |
689 { | 697 { |
690 base::AutoUnlock unlocker(lock_); | 698 base::AutoUnlock unlocker(lock_); |
691 result = client->HandleIncomingMessage(&message); | 699 result = client->HandleIncomingMessage(&message); |
692 } | 700 } |
693 | 701 |
694 if (!result) | 702 if (!result) |
695 RaiseError(); | 703 RaiseError(); |
696 } | 704 } |
697 | 705 |
698 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { | 706 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id, bool close_on_insert) { |
699 lock_.AssertAcquired(); | 707 lock_.AssertAcquired(); |
700 bool inserted = false; | 708 bool inserted = false; |
701 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 709 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
702 if (inserted) { | 710 if (inserted && close_on_insert) { |
703 MarkClosedAndMaybeRemove(endpoint); | 711 MarkClosedAndMaybeRemove(endpoint); |
704 if (!mojo::IsMasterInterfaceId(id)) | 712 if (!mojo::IsMasterInterfaceId(id)) |
705 control_message_proxy_.NotifyPeerEndpointClosed(id); | 713 control_message_proxy_.NotifyPeerEndpointClosed(id); |
706 return nullptr; | 714 return nullptr; |
707 } | 715 } |
708 | 716 |
709 if (endpoint->closed()) | 717 if (endpoint->closed()) |
710 return nullptr; | 718 return nullptr; |
711 | 719 |
712 return endpoint; | 720 return endpoint; |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
745 return true; | 753 return true; |
746 } | 754 } |
747 | 755 |
748 // Checked in places which must be run on the master endpoint's thread. | 756 // Checked in places which must be run on the master endpoint's thread. |
749 base::ThreadChecker thread_checker_; | 757 base::ThreadChecker thread_checker_; |
750 | 758 |
751 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 759 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
752 | 760 |
753 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; | 761 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; |
754 const bool set_interface_id_namespace_bit_; | 762 const bool set_interface_id_namespace_bit_; |
| 763 bool started_ = false; |
755 std::unique_ptr<mojo::Connector> connector_; | 764 std::unique_ptr<mojo::Connector> connector_; |
756 mojo::FilterChain filters_; | 765 mojo::FilterChain filters_; |
757 mojo::PipeControlMessageHandler control_message_handler_; | 766 mojo::PipeControlMessageHandler control_message_handler_; |
758 ControlMessageProxyThunk control_message_proxy_thunk_; | 767 ControlMessageProxyThunk control_message_proxy_thunk_; |
759 mojo::PipeControlMessageProxy control_message_proxy_; | 768 mojo::PipeControlMessageProxy control_message_proxy_; |
760 | 769 |
761 // Outgoing messages that were sent before this controller was bound to a | 770 // Outgoing messages that were sent before this controller was bound to a |
762 // real message pipe. | 771 // real message pipe. |
763 std::vector<mojo::Message> outgoing_messages_; | 772 std::vector<mojo::Message> outgoing_messages_; |
764 | 773 |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
796 void Connect() override { | 805 void Connect() override { |
797 controller_->Bind(std::move(handle_)); | 806 controller_->Bind(std::move(handle_)); |
798 | 807 |
799 IPC::mojom::ChannelAssociatedPtr sender; | 808 IPC::mojom::ChannelAssociatedPtr sender; |
800 IPC::mojom::ChannelAssociatedRequest receiver; | 809 IPC::mojom::ChannelAssociatedRequest receiver; |
801 controller_->CreateChannelEndpoints(&sender, &receiver); | 810 controller_->CreateChannelEndpoints(&sender, &receiver); |
802 | 811 |
803 delegate_->OnPipesAvailable(std::move(sender), std::move(receiver)); | 812 delegate_->OnPipesAvailable(std::move(sender), std::move(receiver)); |
804 } | 813 } |
805 | 814 |
| 815 void Start() override { |
| 816 controller_->Start(); |
| 817 } |
| 818 |
| 819 void Flush() override { |
| 820 controller_->FlushOutgoingMessages(); |
| 821 } |
| 822 |
806 mojo::AssociatedGroup* GetAssociatedGroup() override { | 823 mojo::AssociatedGroup* GetAssociatedGroup() override { |
807 return associated_group_.get(); | 824 return associated_group_.get(); |
808 } | 825 } |
809 | 826 |
810 scoped_refptr<ChannelAssociatedGroupController> controller_; | 827 scoped_refptr<ChannelAssociatedGroupController> controller_; |
811 | 828 |
812 mojo::ScopedMessagePipeHandle handle_; | 829 mojo::ScopedMessagePipeHandle handle_; |
813 Delegate* delegate_; | 830 Delegate* delegate_; |
814 std::unique_ptr<mojo::AssociatedGroup> associated_group_; | 831 std::unique_ptr<mojo::AssociatedGroup> associated_group_; |
815 | 832 |
816 DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl); | 833 DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl); |
817 }; | 834 }; |
818 | 835 |
819 } // namespace | 836 } // namespace |
820 | 837 |
821 // static | 838 // static |
822 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( | 839 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( |
823 mojo::ScopedMessagePipeHandle handle, | 840 mojo::ScopedMessagePipeHandle handle, |
824 Channel::Mode mode, | 841 Channel::Mode mode, |
825 Delegate* delegate, | 842 Delegate* delegate, |
826 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 843 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
827 return base::MakeUnique<MojoBootstrapImpl>( | 844 return base::MakeUnique<MojoBootstrapImpl>( |
828 std::move(handle), delegate, | 845 std::move(handle), delegate, |
829 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, | 846 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, |
830 ipc_task_runner)); | 847 ipc_task_runner)); |
831 } | 848 } |
832 | 849 |
833 } // namespace IPC | 850 } // namespace IPC |
OLD | NEW |