| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <map> | 9 #include <map> |
| 10 #include <memory> | 10 #include <memory> |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 63 DCHECK(thread_checker_.CalledOnValidThread()); | 63 DCHECK(thread_checker_.CalledOnValidThread()); |
| 64 DCHECK(task_runner_->BelongsToCurrentThread()); | 64 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 65 | 65 |
| 66 connector_.reset(new mojo::Connector( | 66 connector_.reset(new mojo::Connector( |
| 67 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, | 67 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, |
| 68 task_runner_)); | 68 task_runner_)); |
| 69 connector_->set_incoming_receiver(&filters_); | 69 connector_->set_incoming_receiver(&filters_); |
| 70 connector_->set_connection_error_handler( | 70 connector_->set_connection_error_handler( |
| 71 base::Bind(&ChannelAssociatedGroupController::OnPipeError, | 71 base::Bind(&ChannelAssociatedGroupController::OnPipeError, |
| 72 base::Unretained(this))); | 72 base::Unretained(this))); |
| 73 } |
| 73 | 74 |
| 75 void Start() { |
| 76 DCHECK(!started_); |
| 77 started_ = true; |
| 78 } |
| 79 |
| 80 void FlushOutgoingMessages() { |
| 74 std::vector<mojo::Message> outgoing_messages; | 81 std::vector<mojo::Message> outgoing_messages; |
| 75 std::swap(outgoing_messages, outgoing_messages_); | 82 std::swap(outgoing_messages, outgoing_messages_); |
| 76 for (auto& message : outgoing_messages) | 83 for (auto& message : outgoing_messages) |
| 77 SendMessage(&message); | 84 SendMessage(&message); |
| 78 } | 85 } |
| 79 | 86 |
| 80 void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender, | 87 void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender, |
| 81 mojom::ChannelAssociatedRequest* receiver) { | 88 mojom::ChannelAssociatedRequest* receiver) { |
| 82 mojo::InterfaceId sender_id, receiver_id; | 89 mojo::InterfaceId sender_id, receiver_id; |
| 83 if (set_interface_id_namespace_bit_) { | 90 if (set_interface_id_namespace_bit_) { |
| (...skipping 375 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 459 DCHECK(endpoint->closed()); | 466 DCHECK(endpoint->closed()); |
| 460 MarkPeerClosedAndMaybeRemove(endpoint); | 467 MarkPeerClosedAndMaybeRemove(endpoint); |
| 461 } | 468 } |
| 462 | 469 |
| 463 DCHECK(endpoints_.empty()); | 470 DCHECK(endpoints_.empty()); |
| 464 } | 471 } |
| 465 | 472 |
| 466 bool SendMessage(mojo::Message* message) { | 473 bool SendMessage(mojo::Message* message) { |
| 467 if (task_runner_->BelongsToCurrentThread()) { | 474 if (task_runner_->BelongsToCurrentThread()) { |
| 468 DCHECK(thread_checker_.CalledOnValidThread()); | 475 DCHECK(thread_checker_.CalledOnValidThread()); |
| 469 if (!connector_) { | 476 if (!connector_ || !started_) { |
| 470 // Pipe may not be bound yet, so we queue the message. | 477 // Pipe may not be bound yet or the channel may still be paused, so we |
| 478 // queue the message. |
| 471 outgoing_messages_.emplace_back(std::move(*message)); | 479 outgoing_messages_.emplace_back(std::move(*message)); |
| 472 return true; | 480 return true; |
| 473 } | 481 } |
| 474 return connector_->Accept(message); | 482 return connector_->Accept(message); |
| 475 } else { | 483 } else { |
| 476 // We always post tasks to the master endpoint thread when called from the | 484 // We always post tasks to the master endpoint thread when called from the |
| 477 // proxy thread in order to simulate IPC::ChannelProxy::Send behavior. | 485 // proxy thread in order to simulate IPC::ChannelProxy::Send behavior. |
| 478 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 486 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 479 task_runner_->PostTask( | 487 task_runner_->PostTask( |
| 480 FROM_HERE, | 488 FROM_HERE, |
| (...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 584 bool Accept(mojo::Message* message) override { | 592 bool Accept(mojo::Message* message) override { |
| 585 DCHECK(thread_checker_.CalledOnValidThread()); | 593 DCHECK(thread_checker_.CalledOnValidThread()); |
| 586 | 594 |
| 587 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) | 595 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) |
| 588 return control_message_handler_.Accept(message); | 596 return control_message_handler_.Accept(message); |
| 589 | 597 |
| 590 mojo::InterfaceId id = message->interface_id(); | 598 mojo::InterfaceId id = message->interface_id(); |
| 591 DCHECK(mojo::IsValidInterfaceId(id)); | 599 DCHECK(mojo::IsValidInterfaceId(id)); |
| 592 | 600 |
| 593 base::AutoLock locker(lock_); | 601 base::AutoLock locker(lock_); |
| 594 Endpoint* endpoint = GetEndpointForDispatch(id); | 602 Endpoint* endpoint = |
| 595 if (!endpoint) | 603 GetEndpointForDispatch(id, false /* close_on_insert */); |
| 596 return true; | 604 mojo::InterfaceEndpointClient* client = |
| 597 | 605 endpoint ? endpoint->client() : nullptr; |
| 598 mojo::InterfaceEndpointClient* client = endpoint->client(); | |
| 599 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { | 606 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { |
| 600 // No client has been bound yet or the client runs tasks on another | 607 // No client has been bound yet or the client runs tasks on another |
| 601 // thread. We assume the other thread must always be the one on which | 608 // thread. We assume the other thread must always be the one on which |
| 602 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. | 609 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. |
| 603 // | 610 // |
| 604 // If the client is not yet bound, it must be bound by the time this task | 611 // If the client is not yet bound, it must be bound by the time this task |
| 605 // runs or else it's programmer error. | 612 // runs or else it's programmer error. |
| 606 DCHECK(proxy_task_runner_); | 613 DCHECK(proxy_task_runner_); |
| 607 | 614 |
| 608 if (message->has_flag(mojo::Message::kFlagIsSync)) { | 615 if (message->has_flag(mojo::Message::kFlagIsSync)) { |
| (...skipping 26 matching lines...) Expand all Loading... |
| 635 return client->HandleIncomingMessage(message); | 642 return client->HandleIncomingMessage(message); |
| 636 } | 643 } |
| 637 | 644 |
| 638 void AcceptOnProxyThread(mojo::Message message) { | 645 void AcceptOnProxyThread(mojo::Message message) { |
| 639 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 646 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 640 | 647 |
| 641 mojo::InterfaceId id = message.interface_id(); | 648 mojo::InterfaceId id = message.interface_id(); |
| 642 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); | 649 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
| 643 | 650 |
| 644 base::AutoLock locker(lock_); | 651 base::AutoLock locker(lock_); |
| 645 Endpoint* endpoint = GetEndpointForDispatch(id); | 652 Endpoint* endpoint = GetEndpointForDispatch(id, true /* close_on_insert */); |
| 646 if (!endpoint) | 653 if (!endpoint) |
| 647 return; | 654 return; |
| 648 | 655 |
| 649 mojo::InterfaceEndpointClient* client = endpoint->client(); | 656 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 650 if (!client) | 657 if (!client) |
| 651 return; | 658 return; |
| 652 | 659 |
| 653 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 660 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 654 | 661 |
| 655 // Sync messages should never make their way to this method. | 662 // Sync messages should never make their way to this method. |
| 656 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); | 663 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); |
| 657 | 664 |
| 658 bool result = false; | 665 bool result = false; |
| 659 { | 666 { |
| 660 base::AutoUnlock unlocker(lock_); | 667 base::AutoUnlock unlocker(lock_); |
| 661 result = client->HandleIncomingMessage(&message); | 668 result = client->HandleIncomingMessage(&message); |
| 662 } | 669 } |
| 663 | 670 |
| 664 if (!result) | 671 if (!result) |
| 665 RaiseError(); | 672 RaiseError(); |
| 666 } | 673 } |
| 667 | 674 |
| 668 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { | 675 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { |
| 669 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 676 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 670 | 677 |
| 671 base::AutoLock locker(lock_); | 678 base::AutoLock locker(lock_); |
| 672 Endpoint* endpoint = GetEndpointForDispatch(interface_id); | 679 Endpoint* endpoint = |
| 680 GetEndpointForDispatch(interface_id, true /* close_on_insert */); |
| 673 if (!endpoint) | 681 if (!endpoint) |
| 674 return; | 682 return; |
| 675 | 683 |
| 676 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 684 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 677 mojo::Message message = endpoint->PopSyncMessage(message_id); | 685 mojo::Message message = endpoint->PopSyncMessage(message_id); |
| 678 | 686 |
| 679 // The message must have already been dequeued by the endpoint waking up | 687 // The message must have already been dequeued by the endpoint waking up |
| 680 // from a sync wait. Nothing to do. | 688 // from a sync wait. Nothing to do. |
| 681 if (message.IsNull()) | 689 if (message.IsNull()) |
| 682 return; | 690 return; |
| 683 | 691 |
| 684 mojo::InterfaceEndpointClient* client = endpoint->client(); | 692 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 685 if (!client) | 693 if (!client) |
| 686 return; | 694 return; |
| 687 | 695 |
| 688 bool result = false; | 696 bool result = false; |
| 689 { | 697 { |
| 690 base::AutoUnlock unlocker(lock_); | 698 base::AutoUnlock unlocker(lock_); |
| 691 result = client->HandleIncomingMessage(&message); | 699 result = client->HandleIncomingMessage(&message); |
| 692 } | 700 } |
| 693 | 701 |
| 694 if (!result) | 702 if (!result) |
| 695 RaiseError(); | 703 RaiseError(); |
| 696 } | 704 } |
| 697 | 705 |
| 698 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { | 706 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id, bool close_on_insert) { |
| 699 lock_.AssertAcquired(); | 707 lock_.AssertAcquired(); |
| 700 bool inserted = false; | 708 bool inserted = false; |
| 701 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 709 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 702 if (inserted) { | 710 if (inserted && close_on_insert) { |
| 703 MarkClosedAndMaybeRemove(endpoint); | 711 MarkClosedAndMaybeRemove(endpoint); |
| 704 if (!mojo::IsMasterInterfaceId(id)) | 712 if (!mojo::IsMasterInterfaceId(id)) |
| 705 control_message_proxy_.NotifyPeerEndpointClosed(id); | 713 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 706 return nullptr; | 714 return nullptr; |
| 707 } | 715 } |
| 708 | 716 |
| 709 if (endpoint->closed()) | 717 if (endpoint->closed()) |
| 710 return nullptr; | 718 return nullptr; |
| 711 | 719 |
| 712 return endpoint; | 720 return endpoint; |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 745 return true; | 753 return true; |
| 746 } | 754 } |
| 747 | 755 |
| 748 // Checked in places which must be run on the master endpoint's thread. | 756 // Checked in places which must be run on the master endpoint's thread. |
| 749 base::ThreadChecker thread_checker_; | 757 base::ThreadChecker thread_checker_; |
| 750 | 758 |
| 751 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 759 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 752 | 760 |
| 753 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; | 761 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; |
| 754 const bool set_interface_id_namespace_bit_; | 762 const bool set_interface_id_namespace_bit_; |
| 763 bool started_ = false; |
| 755 std::unique_ptr<mojo::Connector> connector_; | 764 std::unique_ptr<mojo::Connector> connector_; |
| 756 mojo::FilterChain filters_; | 765 mojo::FilterChain filters_; |
| 757 mojo::PipeControlMessageHandler control_message_handler_; | 766 mojo::PipeControlMessageHandler control_message_handler_; |
| 758 ControlMessageProxyThunk control_message_proxy_thunk_; | 767 ControlMessageProxyThunk control_message_proxy_thunk_; |
| 759 mojo::PipeControlMessageProxy control_message_proxy_; | 768 mojo::PipeControlMessageProxy control_message_proxy_; |
| 760 | 769 |
| 761 // Outgoing messages that were sent before this controller was bound to a | 770 // Outgoing messages that were sent before this controller was bound to a |
| 762 // real message pipe. | 771 // real message pipe. |
| 763 std::vector<mojo::Message> outgoing_messages_; | 772 std::vector<mojo::Message> outgoing_messages_; |
| 764 | 773 |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 796 void Connect() override { | 805 void Connect() override { |
| 797 controller_->Bind(std::move(handle_)); | 806 controller_->Bind(std::move(handle_)); |
| 798 | 807 |
| 799 IPC::mojom::ChannelAssociatedPtr sender; | 808 IPC::mojom::ChannelAssociatedPtr sender; |
| 800 IPC::mojom::ChannelAssociatedRequest receiver; | 809 IPC::mojom::ChannelAssociatedRequest receiver; |
| 801 controller_->CreateChannelEndpoints(&sender, &receiver); | 810 controller_->CreateChannelEndpoints(&sender, &receiver); |
| 802 | 811 |
| 803 delegate_->OnPipesAvailable(std::move(sender), std::move(receiver)); | 812 delegate_->OnPipesAvailable(std::move(sender), std::move(receiver)); |
| 804 } | 813 } |
| 805 | 814 |
| 815 void Start() override { |
| 816 controller_->Start(); |
| 817 } |
| 818 |
| 819 void Flush() override { |
| 820 controller_->FlushOutgoingMessages(); |
| 821 } |
| 822 |
| 806 mojo::AssociatedGroup* GetAssociatedGroup() override { | 823 mojo::AssociatedGroup* GetAssociatedGroup() override { |
| 807 return associated_group_.get(); | 824 return associated_group_.get(); |
| 808 } | 825 } |
| 809 | 826 |
| 810 scoped_refptr<ChannelAssociatedGroupController> controller_; | 827 scoped_refptr<ChannelAssociatedGroupController> controller_; |
| 811 | 828 |
| 812 mojo::ScopedMessagePipeHandle handle_; | 829 mojo::ScopedMessagePipeHandle handle_; |
| 813 Delegate* delegate_; | 830 Delegate* delegate_; |
| 814 std::unique_ptr<mojo::AssociatedGroup> associated_group_; | 831 std::unique_ptr<mojo::AssociatedGroup> associated_group_; |
| 815 | 832 |
| 816 DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl); | 833 DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl); |
| 817 }; | 834 }; |
| 818 | 835 |
| 819 } // namespace | 836 } // namespace |
| 820 | 837 |
| 821 // static | 838 // static |
| 822 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( | 839 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( |
| 823 mojo::ScopedMessagePipeHandle handle, | 840 mojo::ScopedMessagePipeHandle handle, |
| 824 Channel::Mode mode, | 841 Channel::Mode mode, |
| 825 Delegate* delegate, | 842 Delegate* delegate, |
| 826 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 843 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
| 827 return base::MakeUnique<MojoBootstrapImpl>( | 844 return base::MakeUnique<MojoBootstrapImpl>( |
| 828 std::move(handle), delegate, | 845 std::move(handle), delegate, |
| 829 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, | 846 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, |
| 830 ipc_task_runner)); | 847 ipc_task_runner)); |
| 831 } | 848 } |
| 832 | 849 |
| 833 } // namespace IPC | 850 } // namespace IPC |
| OLD | NEW |