| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/debug/alias.h" | 9 #include "base/debug/alias.h" |
| 10 #include "base/logging.h" | 10 #include "base/logging.h" |
| (...skipping 370 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 381 WillSendPort_Locked(port, destination_node_name, &data.new_port_name, | 381 WillSendPort_Locked(port, destination_node_name, &data.new_port_name, |
| 382 &data.new_port_descriptor); | 382 &data.new_port_descriptor); |
| 383 delegate_->ForwardMessage( | 383 delegate_->ForwardMessage( |
| 384 destination_node_name, | 384 destination_node_name, |
| 385 NewInternalMessage(destination_port_name, | 385 NewInternalMessage(destination_port_name, |
| 386 EventType::kMergePort, data)); | 386 EventType::kMergePort, data)); |
| 387 } | 387 } |
| 388 return OK; | 388 return OK; |
| 389 } | 389 } |
| 390 | 390 |
| 391 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) { |
| 392 Port* port0 = port0_ref.port(); |
| 393 Port* port1 = port1_ref.port(); |
| 394 int rv; |
| 395 { |
| 396 // |ports_lock_| must be held when acquiring overlapping port locks. |
| 397 base::AutoLock ports_lock(ports_lock_); |
| 398 base::AutoLock port0_lock(port0->lock); |
| 399 base::AutoLock port1_lock(port1->lock); |
| 400 |
| 401 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_ |
| 402 << " and " << port1_ref.name() << "@" << name_; |
| 403 |
| 404 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving) |
| 405 rv = ERROR_PORT_STATE_UNEXPECTED; |
| 406 else |
| 407 rv = MergePorts_Locked(port0_ref, port1_ref); |
| 408 } |
| 409 |
| 410 if (rv != OK) { |
| 411 ClosePort(port0_ref); |
| 412 ClosePort(port1_ref); |
| 413 } |
| 414 |
| 415 return rv; |
| 416 } |
| 417 |
| 391 int Node::LostConnectionToNode(const NodeName& node_name) { | 418 int Node::LostConnectionToNode(const NodeName& node_name) { |
| 392 // We can no longer send events to the given node. We also can't expect any | 419 // We can no longer send events to the given node. We also can't expect any |
| 393 // PortAccepted events. | 420 // PortAccepted events. |
| 394 | 421 |
| 395 DVLOG(1) << "Observing lost connection from node " << name_ | 422 DVLOG(1) << "Observing lost connection from node " << name_ |
| 396 << " to node " << node_name; | 423 << " to node " << node_name; |
| 397 | 424 |
| 398 std::vector<PortRef> ports_to_notify; | 425 std::vector<PortRef> ports_to_notify; |
| 399 | 426 |
| 400 { | 427 { |
| (...skipping 355 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 756 if (port->state != Port::kReceiving) { | 783 if (port->state != Port::kReceiving) { |
| 757 close_new_port = true; | 784 close_new_port = true; |
| 758 } else { | 785 } else { |
| 759 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); | 786 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); |
| 760 base::AutoLock new_port_lock(new_port->lock); | 787 base::AutoLock new_port_lock(new_port->lock); |
| 761 DCHECK(new_port->state == Port::kReceiving); | 788 DCHECK(new_port->state == Port::kReceiving); |
| 762 | 789 |
| 763 // Both ports are locked. Now all we have to do is swap their peer | 790 // Both ports are locked. Now all we have to do is swap their peer |
| 764 // information and set them up as proxies. | 791 // information and set them up as proxies. |
| 765 | 792 |
| 766 std::swap(port->peer_node_name, new_port->peer_node_name); | 793 PortRef port0_ref(port_name, port); |
| 767 std::swap(port->peer_port_name, new_port->peer_port_name); | 794 PortRef port1_ref(event.new_port_name, new_port); |
| 768 std::swap(port->peer_closed, new_port->peer_closed); | 795 int rv = MergePorts_Locked(port0_ref, port1_ref); |
| 796 if (rv == OK) |
| 797 return rv; |
| 769 | 798 |
| 770 port->state = Port::kBuffering; | |
| 771 if (port->peer_closed) | |
| 772 port->remove_proxy_on_last_message = true; | |
| 773 | |
| 774 new_port->state = Port::kBuffering; | |
| 775 if (new_port->peer_closed) | |
| 776 new_port->remove_proxy_on_last_message = true; | |
| 777 | |
| 778 int rv1 = BeginProxying_Locked(port.get(), port_name); | |
| 779 int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); | |
| 780 | |
| 781 if (rv1 == OK && rv2 == OK) | |
| 782 return OK; | |
| 783 | |
| 784 // If either proxy failed to initialize (e.g. had undeliverable messages | |
| 785 // or ended up in a bad state somehow), we keep the system in a consistent | |
| 786 // state by undoing the peer swap and closing both merge ports. | |
| 787 | |
| 788 std::swap(port->peer_node_name, new_port->peer_node_name); | |
| 789 std::swap(port->peer_port_name, new_port->peer_port_name); | |
| 790 port->state = Port::kReceiving; | |
| 791 new_port->state = Port::kReceiving; | |
| 792 close_new_port = true; | 799 close_new_port = true; |
| 793 close_target_port = true; | 800 close_target_port = true; |
| 794 } | 801 } |
| 795 } | 802 } |
| 796 | 803 |
| 797 if (close_target_port) { | 804 if (close_target_port) { |
| 798 PortRef target_port; | 805 PortRef target_port; |
| 799 rv = GetPort(port_name, &target_port); | 806 rv = GetPort(port_name, &target_port); |
| 800 DCHECK(rv == OK); | 807 DCHECK(rv == OK); |
| 801 | 808 |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 842 | 849 |
| 843 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) { | 850 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) { |
| 844 ports_lock_.AssertAcquired(); | 851 ports_lock_.AssertAcquired(); |
| 845 auto iter = ports_.find(port_name); | 852 auto iter = ports_.find(port_name); |
| 846 if (iter == ports_.end()) | 853 if (iter == ports_.end()) |
| 847 return nullptr; | 854 return nullptr; |
| 848 | 855 |
| 849 return iter->second; | 856 return iter->second; |
| 850 } | 857 } |
| 851 | 858 |
| 859 int Node::MergePorts_Locked(const PortRef& port0_ref, |
| 860 const PortRef& port1_ref) { |
| 861 Port* port0 = port0_ref.port(); |
| 862 Port* port1 = port1_ref.port(); |
| 863 |
| 864 ports_lock_.AssertAcquired(); |
| 865 port0->lock.AssertAcquired(); |
| 866 port1->lock.AssertAcquired(); |
| 867 |
| 868 CHECK(port0->state == Port::kReceiving); |
| 869 CHECK(port1->state == Port::kReceiving); |
| 870 |
| 871 // Ports cannot be merged with their own receiving peer! |
| 872 if (port0->peer_node_name == name_ && |
| 873 port0->peer_port_name == port1_ref.name()) |
| 874 return ERROR_PORT_STATE_UNEXPECTED; |
| 875 |
| 876 if (port1->peer_node_name == name_ && |
| 877 port1->peer_port_name == port0_ref.name()) |
| 878 return ERROR_PORT_STATE_UNEXPECTED; |
| 879 |
| 880 // Only merge if both ports have never sent a message. |
| 881 if (port0->next_sequence_num_to_send == kInitialSequenceNum && |
| 882 port1->next_sequence_num_to_send == kInitialSequenceNum) { |
| 883 // Swap the ports' peer information and switch them both into buffering |
| 884 // (eventually proxying) mode. |
| 885 |
| 886 std::swap(port0->peer_node_name, port1->peer_node_name); |
| 887 std::swap(port0->peer_port_name, port1->peer_port_name); |
| 888 std::swap(port0->peer_closed, port1->peer_closed); |
| 889 |
| 890 port0->state = Port::kBuffering; |
| 891 if (port0->peer_closed) |
| 892 port0->remove_proxy_on_last_message = true; |
| 893 |
| 894 port1->state = Port::kBuffering; |
| 895 if (port1->peer_closed) |
| 896 port1->remove_proxy_on_last_message = true; |
| 897 |
| 898 int rv1 = BeginProxying_Locked(port0, port0_ref.name()); |
| 899 int rv2 = BeginProxying_Locked(port1, port1_ref.name()); |
| 900 |
| 901 if (rv1 == OK && rv2 == OK) { |
| 902 // If either merged port had a closed peer, its new peer needs to be |
| 903 // informed of this. |
| 904 if (port1->peer_closed) { |
| 905 ObserveClosureEventData data; |
| 906 data.last_sequence_num = port0->last_sequence_num_to_receive; |
| 907 delegate_->ForwardMessage( |
| 908 port0->peer_node_name, |
| 909 NewInternalMessage(port0->peer_port_name, |
| 910 EventType::kObserveClosure, data)); |
| 911 } |
| 912 |
| 913 if (port0->peer_closed) { |
| 914 ObserveClosureEventData data; |
| 915 data.last_sequence_num = port1->last_sequence_num_to_receive; |
| 916 delegate_->ForwardMessage( |
| 917 port1->peer_node_name, |
| 918 NewInternalMessage(port1->peer_port_name, |
| 919 EventType::kObserveClosure, data)); |
| 920 } |
| 921 |
| 922 return OK; |
| 923 } |
| 924 |
| 925 // If either proxy failed to initialize (e.g. had undeliverable messages |
| 926 // or ended up in a bad state somehow), we keep the system in a consistent |
| 927 // state by undoing the peer swap. |
| 928 std::swap(port0->peer_node_name, port1->peer_node_name); |
| 929 std::swap(port0->peer_port_name, port1->peer_port_name); |
| 930 std::swap(port0->peer_closed, port1->peer_closed); |
| 931 port0->remove_proxy_on_last_message = false; |
| 932 port1->remove_proxy_on_last_message = false; |
| 933 port0->state = Port::kReceiving; |
| 934 port1->state = Port::kReceiving; |
| 935 } |
| 936 |
| 937 return ERROR_PORT_STATE_UNEXPECTED; |
| 938 } |
| 939 |
| 852 void Node::WillSendPort_Locked(Port* port, | 940 void Node::WillSendPort_Locked(Port* port, |
| 853 const NodeName& to_node_name, | 941 const NodeName& to_node_name, |
| 854 PortName* port_name, | 942 PortName* port_name, |
| 855 PortDescriptor* port_descriptor) { | 943 PortDescriptor* port_descriptor) { |
| 856 ports_lock_.AssertAcquired(); | 944 ports_lock_.AssertAcquired(); |
| 857 port->lock.AssertAcquired(); | 945 port->lock.AssertAcquired(); |
| 858 | 946 |
| 859 PortName local_port_name = *port_name; | 947 PortName local_port_name = *port_name; |
| 860 | 948 |
| 861 PortName new_port_name; | 949 PortName new_port_name; |
| (...skipping 257 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1119 | 1207 |
| 1120 if (num_data_bytes) | 1208 if (num_data_bytes) |
| 1121 memcpy(header + 1, data, num_data_bytes); | 1209 memcpy(header + 1, data, num_data_bytes); |
| 1122 | 1210 |
| 1123 return message; | 1211 return message; |
| 1124 } | 1212 } |
| 1125 | 1213 |
| 1126 } // namespace ports | 1214 } // namespace ports |
| 1127 } // namespace edk | 1215 } // namespace edk |
| 1128 } // namespace mojo | 1216 } // namespace mojo |
| OLD | NEW |