Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(141)

Side by Side Diff: mojo/edk/system/ports/node.cc

Issue 1785843002: [mojo] Implement pipe fusion API (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/public/c/system/message_pipe.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/ports/node.h" 5 #include "mojo/edk/system/ports/node.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/debug/alias.h" 9 #include "base/debug/alias.h"
10 #include "base/logging.h" 10 #include "base/logging.h"
(...skipping 370 matching lines...) Expand 10 before | Expand all | Expand 10 after
381 WillSendPort_Locked(port, destination_node_name, &data.new_port_name, 381 WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
382 &data.new_port_descriptor); 382 &data.new_port_descriptor);
383 delegate_->ForwardMessage( 383 delegate_->ForwardMessage(
384 destination_node_name, 384 destination_node_name,
385 NewInternalMessage(destination_port_name, 385 NewInternalMessage(destination_port_name,
386 EventType::kMergePort, data)); 386 EventType::kMergePort, data));
387 } 387 }
388 return OK; 388 return OK;
389 } 389 }
390 390
391 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
392 Port* port0 = port0_ref.port();
393 Port* port1 = port1_ref.port();
394 int rv;
395 {
396 // |ports_lock_| must be held when acquiring overlapping port locks.
397 base::AutoLock ports_lock(ports_lock_);
398 base::AutoLock port0_lock(port0->lock);
399 base::AutoLock port1_lock(port1->lock);
400
401 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
402 << " and " << port1_ref.name() << "@" << name_;
403
404 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
405 rv = ERROR_PORT_STATE_UNEXPECTED;
406 else
407 rv = MergePorts_Locked(port0_ref, port1_ref);
408 }
409
410 if (rv != OK) {
411 ClosePort(port0_ref);
412 ClosePort(port1_ref);
413 }
414
415 return rv;
416 }
417
391 int Node::LostConnectionToNode(const NodeName& node_name) { 418 int Node::LostConnectionToNode(const NodeName& node_name) {
392 // We can no longer send events to the given node. We also can't expect any 419 // We can no longer send events to the given node. We also can't expect any
393 // PortAccepted events. 420 // PortAccepted events.
394 421
395 DVLOG(1) << "Observing lost connection from node " << name_ 422 DVLOG(1) << "Observing lost connection from node " << name_
396 << " to node " << node_name; 423 << " to node " << node_name;
397 424
398 std::vector<PortRef> ports_to_notify; 425 std::vector<PortRef> ports_to_notify;
399 426
400 { 427 {
(...skipping 355 matching lines...) Expand 10 before | Expand all | Expand 10 after
756 if (port->state != Port::kReceiving) { 783 if (port->state != Port::kReceiving) {
757 close_new_port = true; 784 close_new_port = true;
758 } else { 785 } else {
759 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); 786 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
760 base::AutoLock new_port_lock(new_port->lock); 787 base::AutoLock new_port_lock(new_port->lock);
761 DCHECK(new_port->state == Port::kReceiving); 788 DCHECK(new_port->state == Port::kReceiving);
762 789
763 // Both ports are locked. Now all we have to do is swap their peer 790 // Both ports are locked. Now all we have to do is swap their peer
764 // information and set them up as proxies. 791 // information and set them up as proxies.
765 792
766 std::swap(port->peer_node_name, new_port->peer_node_name); 793 PortRef port0_ref(port_name, port);
767 std::swap(port->peer_port_name, new_port->peer_port_name); 794 PortRef port1_ref(event.new_port_name, new_port);
768 std::swap(port->peer_closed, new_port->peer_closed); 795 int rv = MergePorts_Locked(port0_ref, port1_ref);
796 if (rv == OK)
797 return rv;
769 798
770 port->state = Port::kBuffering;
771 if (port->peer_closed)
772 port->remove_proxy_on_last_message = true;
773
774 new_port->state = Port::kBuffering;
775 if (new_port->peer_closed)
776 new_port->remove_proxy_on_last_message = true;
777
778 int rv1 = BeginProxying_Locked(port.get(), port_name);
779 int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
780
781 if (rv1 == OK && rv2 == OK)
782 return OK;
783
784 // If either proxy failed to initialize (e.g. had undeliverable messages
785 // or ended up in a bad state somehow), we keep the system in a consistent
786 // state by undoing the peer swap and closing both merge ports.
787
788 std::swap(port->peer_node_name, new_port->peer_node_name);
789 std::swap(port->peer_port_name, new_port->peer_port_name);
790 port->state = Port::kReceiving;
791 new_port->state = Port::kReceiving;
792 close_new_port = true; 799 close_new_port = true;
793 close_target_port = true; 800 close_target_port = true;
794 } 801 }
795 } 802 }
796 803
797 if (close_target_port) { 804 if (close_target_port) {
798 PortRef target_port; 805 PortRef target_port;
799 rv = GetPort(port_name, &target_port); 806 rv = GetPort(port_name, &target_port);
800 DCHECK(rv == OK); 807 DCHECK(rv == OK);
801 808
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
842 849
843 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) { 850 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
844 ports_lock_.AssertAcquired(); 851 ports_lock_.AssertAcquired();
845 auto iter = ports_.find(port_name); 852 auto iter = ports_.find(port_name);
846 if (iter == ports_.end()) 853 if (iter == ports_.end())
847 return nullptr; 854 return nullptr;
848 855
849 return iter->second; 856 return iter->second;
850 } 857 }
851 858
859 int Node::MergePorts_Locked(const PortRef& port0_ref,
860 const PortRef& port1_ref) {
861 Port* port0 = port0_ref.port();
862 Port* port1 = port1_ref.port();
863
864 ports_lock_.AssertAcquired();
865 port0->lock.AssertAcquired();
866 port1->lock.AssertAcquired();
867
868 CHECK(port0->state == Port::kReceiving);
869 CHECK(port1->state == Port::kReceiving);
870
871 // Ports cannot be merged with their own receiving peer!
872 if (port0->peer_node_name == name_ &&
873 port0->peer_port_name == port1_ref.name())
874 return ERROR_PORT_STATE_UNEXPECTED;
875
876 if (port1->peer_node_name == name_ &&
877 port1->peer_port_name == port0_ref.name())
878 return ERROR_PORT_STATE_UNEXPECTED;
879
880 // Only merge if both ports have never sent a message.
881 if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
882 port1->next_sequence_num_to_send == kInitialSequenceNum) {
883 // Swap the ports' peer information and switch them both into buffering
884 // (eventually proxying) mode.
885
886 std::swap(port0->peer_node_name, port1->peer_node_name);
887 std::swap(port0->peer_port_name, port1->peer_port_name);
888 std::swap(port0->peer_closed, port1->peer_closed);
889
890 port0->state = Port::kBuffering;
891 if (port0->peer_closed)
892 port0->remove_proxy_on_last_message = true;
893
894 port1->state = Port::kBuffering;
895 if (port1->peer_closed)
896 port1->remove_proxy_on_last_message = true;
897
898 int rv1 = BeginProxying_Locked(port0, port0_ref.name());
899 int rv2 = BeginProxying_Locked(port1, port1_ref.name());
900
901 if (rv1 == OK && rv2 == OK) {
902 // If either merged port had a closed peer, its new peer needs to be
903 // informed of this.
904 if (port1->peer_closed) {
905 ObserveClosureEventData data;
906 data.last_sequence_num = port0->last_sequence_num_to_receive;
907 delegate_->ForwardMessage(
908 port0->peer_node_name,
909 NewInternalMessage(port0->peer_port_name,
910 EventType::kObserveClosure, data));
911 }
912
913 if (port0->peer_closed) {
914 ObserveClosureEventData data;
915 data.last_sequence_num = port1->last_sequence_num_to_receive;
916 delegate_->ForwardMessage(
917 port1->peer_node_name,
918 NewInternalMessage(port1->peer_port_name,
919 EventType::kObserveClosure, data));
920 }
921
922 return OK;
923 }
924
925 // If either proxy failed to initialize (e.g. had undeliverable messages
926 // or ended up in a bad state somehow), we keep the system in a consistent
927 // state by undoing the peer swap.
928 std::swap(port0->peer_node_name, port1->peer_node_name);
929 std::swap(port0->peer_port_name, port1->peer_port_name);
930 std::swap(port0->peer_closed, port1->peer_closed);
931 port0->remove_proxy_on_last_message = false;
932 port1->remove_proxy_on_last_message = false;
933 port0->state = Port::kReceiving;
934 port1->state = Port::kReceiving;
935 }
936
937 return ERROR_PORT_STATE_UNEXPECTED;
938 }
939
852 void Node::WillSendPort_Locked(Port* port, 940 void Node::WillSendPort_Locked(Port* port,
853 const NodeName& to_node_name, 941 const NodeName& to_node_name,
854 PortName* port_name, 942 PortName* port_name,
855 PortDescriptor* port_descriptor) { 943 PortDescriptor* port_descriptor) {
856 ports_lock_.AssertAcquired(); 944 ports_lock_.AssertAcquired();
857 port->lock.AssertAcquired(); 945 port->lock.AssertAcquired();
858 946
859 PortName local_port_name = *port_name; 947 PortName local_port_name = *port_name;
860 948
861 PortName new_port_name; 949 PortName new_port_name;
(...skipping 257 matching lines...) Expand 10 before | Expand all | Expand 10 after
1119 1207
1120 if (num_data_bytes) 1208 if (num_data_bytes)
1121 memcpy(header + 1, data, num_data_bytes); 1209 memcpy(header + 1, data, num_data_bytes);
1122 1210
1123 return message; 1211 return message;
1124 } 1212 }
1125 1213
1126 } // namespace ports 1214 } // namespace ports
1127 } // namespace edk 1215 } // namespace edk
1128 } // namespace mojo 1216 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/public/c/system/message_pipe.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698