Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: mojo/edk/system/ports/node.cc

Issue 1678333003: Revert of [mojo-edk] Simplify multiprocess pipe bootstrap (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/edk/system/ports/ports_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/ports/node.h" 5 #include "mojo/edk/system/ports/node.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
(...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after
347 header->port_name, 347 header->port_name,
348 *GetEventData<ObserveProxyEventData>(*message)); 348 *GetEventData<ObserveProxyEventData>(*message));
349 case EventType::kObserveProxyAck: 349 case EventType::kObserveProxyAck:
350 return OnObserveProxyAck( 350 return OnObserveProxyAck(
351 header->port_name, 351 header->port_name,
352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
353 case EventType::kObserveClosure: 353 case EventType::kObserveClosure:
354 return OnObserveClosure( 354 return OnObserveClosure(
355 header->port_name, 355 header->port_name,
356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
357 case EventType::kMergePort:
358 return OnMergePort(header->port_name,
359 *GetEventData<MergePortEventData>(*message));
360 } 357 }
361 return OOPS(ERROR_NOT_IMPLEMENTED); 358 return OOPS(ERROR_NOT_IMPLEMENTED);
362 } 359 }
363 360
364 int Node::MergePorts(const PortRef& port_ref,
365 const NodeName& destination_node_name,
366 const PortName& destination_port_name) {
367 Port* port = port_ref.port();
368 {
369 // |ports_lock_| must be held for WillSendPort_Locked below.
370 base::AutoLock ports_lock(ports_lock_);
371 base::AutoLock lock(port->lock);
372
373 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
374 << " to " << destination_port_name << "@" << destination_node_name;
375
376 // Send the port-to-merge over to the destination node so it can be merged
377 // into the port cycle atomically there.
378 MergePortEventData data;
379 data.new_port_name = port_ref.name();
380 WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
381 &data.new_port_descriptor);
382 delegate_->ForwardMessage(
383 destination_node_name,
384 NewInternalMessage(destination_port_name,
385 EventType::kMergePort, data));
386 }
387 return OK;
388 }
389
390 int Node::LostConnectionToNode(const NodeName& node_name) { 361 int Node::LostConnectionToNode(const NodeName& node_name) {
391 // We can no longer send events to the given node. We also can't expect any 362 // We can no longer send events to the given node. We also can't expect any
392 // PortAccepted events. 363 // PortAccepted events.
393 364
394 DVLOG(1) << "Observing lost connection from node " << name_ 365 DVLOG(1) << "Observing lost connection from node " << name_
395 << " to node " << node_name; 366 << " to node " << node_name;
396 367
397 std::vector<PortRef> ports_to_notify; 368 std::vector<PortRef> ports_to_notify;
398 369
399 { 370 {
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
531 { 502 {
532 // We must hold |ports_lock_| before grabbing the port lock because 503 // We must hold |ports_lock_| before grabbing the port lock because
533 // ForwardMessages_Locked requires it to be held. 504 // ForwardMessages_Locked requires it to be held.
534 base::AutoLock ports_lock(ports_lock_); 505 base::AutoLock ports_lock(ports_lock_);
535 base::AutoLock lock(port->lock); 506 base::AutoLock lock(port->lock);
536 507
537 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ 508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
538 << " pointing to " 509 << " pointing to "
539 << port->peer_port_name << "@" << port->peer_node_name; 510 << port->peer_port_name << "@" << port->peer_node_name;
540 511
541 return BeginProxying_Locked(port.get(), port_name); 512 if (port->state != Port::kBuffering)
513 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
514
515 port->state = Port::kProxying;
516
517 int rv = ForwardMessages_Locked(port.get(), port_name);
518 if (rv != OK)
519 return rv;
520
521 // We may have observed closure before receiving PortAccepted. In that
522 // case, we can advance to removing the proxy without sending out an
523 // ObserveProxy message. We already know the last expected message, etc.
524
525 if (port->remove_proxy_on_last_message) {
526 MaybeRemoveProxy_Locked(port.get(), port_name);
527
528 // Make sure we propagate closure to our current peer.
529 ObserveClosureEventData data;
530 data.last_sequence_num = port->last_sequence_num_to_receive;
531 delegate_->ForwardMessage(
532 port->peer_node_name,
533 NewInternalMessage(port->peer_port_name,
534 EventType::kObserveClosure, data));
535 } else {
536 InitiateProxyRemoval_Locked(port.get(), port_name);
537 }
542 } 538 }
539 return OK;
543 } 540 }
544 541
545 int Node::OnObserveProxy(const PortName& port_name, 542 int Node::OnObserveProxy(const PortName& port_name,
546 const ObserveProxyEventData& event) { 543 const ObserveProxyEventData& event) {
547 // The port may have already been closed locally, in which case the 544 // The port may have already been closed locally, in which case the
548 // ObserveClosure message will contain the last_sequence_num field. 545 // ObserveClosure message will contain the last_sequence_num field.
549 // We can then silently ignore this message. 546 // We can then silently ignore this message.
550 scoped_refptr<Port> port = GetPort(port_name); 547 scoped_refptr<Port> port = GetPort(port_name);
551 if (!port) { 548 if (!port) {
552 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; 549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after
717 NewInternalMessage(port->peer_port_name, 714 NewInternalMessage(port->peer_port_name,
718 EventType::kObserveClosure, forwarded_data)); 715 EventType::kObserveClosure, forwarded_data));
719 } 716 }
720 if (notify_delegate) { 717 if (notify_delegate) {
721 PortRef port_ref(port_name, port); 718 PortRef port_ref(port_name, port);
722 delegate_->PortStatusChanged(port_ref); 719 delegate_->PortStatusChanged(port_ref);
723 } 720 }
724 return OK; 721 return OK;
725 } 722 }
726 723
727 int Node::OnMergePort(const PortName& port_name,
728 const MergePortEventData& event) {
729 scoped_refptr<Port> port = GetPort(port_name);
730 if (!port)
731 return ERROR_PORT_UNKNOWN;
732
733 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
734 << port->state << ") merging with proxy " << event.new_port_name
735 << "@" << name_ << " pointing to "
736 << event.new_port_descriptor.peer_port_name << "@"
737 << event.new_port_descriptor.peer_node_name << " referred by "
738 << event.new_port_descriptor.referring_port_name << "@"
739 << event.new_port_descriptor.referring_node_name;
740
741 bool close_target_port = false;
742 bool close_new_port = false;
743
744 // Accept the new port. This is now the receiving end of the other port cycle
745 // to be merged with ours.
746 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
747 if (rv != OK) {
748 close_target_port = true;
749 } else {
750 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
751 // needs to hold |ports_lock_|. We also acquire multiple port locks within.
752 base::AutoLock ports_lock(ports_lock_);
753 base::AutoLock lock(port->lock);
754
755 if (port->state != Port::kReceiving) {
756 close_new_port = true;
757 } else {
758 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
759 base::AutoLock new_port_lock(new_port->lock);
760 DCHECK(new_port->state == Port::kReceiving);
761
762 // Both ports are locked. Now all we have to do is swap their peer
763 // information and set them up as proxies.
764
765 std::swap(port->peer_node_name, new_port->peer_node_name);
766 std::swap(port->peer_port_name, new_port->peer_port_name);
767 std::swap(port->peer_closed, new_port->peer_closed);
768
769 port->state = Port::kBuffering;
770 if (port->peer_closed)
771 port->remove_proxy_on_last_message = true;
772
773 new_port->state = Port::kBuffering;
774 if (new_port->peer_closed)
775 new_port->remove_proxy_on_last_message = true;
776
777 int rv1 = BeginProxying_Locked(port.get(), port_name);
778 int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
779
780 if (rv1 == OK && rv2 == OK)
781 return OK;
782
783 // If either proxy failed to initialize (e.g. had undeliverable messages
784 // or ended up in a bad state somehow), we keep the system in a consistent
785 // state by undoing the peer swap and closing both merge ports.
786
787 std::swap(port->peer_node_name, new_port->peer_node_name);
788 std::swap(port->peer_port_name, new_port->peer_port_name);
789 port->state = Port::kReceiving;
790 new_port->state = Port::kReceiving;
791 close_new_port = true;
792 close_target_port = true;
793 }
794 }
795
796 if (close_target_port) {
797 PortRef target_port;
798 rv = GetPort(port_name, &target_port);
799 DCHECK(rv == OK);
800
801 ClosePort(target_port);
802 }
803
804 if (close_new_port) {
805 PortRef new_port;
806 rv = GetPort(event.new_port_name, &new_port);
807 DCHECK(rv == OK);
808
809 ClosePort(new_port);
810 }
811
812 return ERROR_PORT_STATE_UNEXPECTED;
813 }
814
815 int Node::AddPortWithName(const PortName& port_name, 724 int Node::AddPortWithName(const PortName& port_name,
816 const scoped_refptr<Port>& port) { 725 const scoped_refptr<Port>& port) {
817 base::AutoLock lock(ports_lock_); 726 base::AutoLock lock(ports_lock_);
818 727
819 if (!ports_.insert(std::make_pair(port_name, port)).second) 728 if (!ports_.insert(std::make_pair(port_name, port)).second)
820 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. 729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
821 730
822 DVLOG(2) << "Created port " << port_name << "@" << name_; 731 DVLOG(2) << "Created port " << port_name << "@" << name_;
823 return OK; 732 return OK;
824 } 733 }
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after
992 << GetEventData<UserEventData>(*message)->sequence_num 901 << GetEventData<UserEventData>(*message)->sequence_num
993 << " [ports=" << ports_buf.str() << "]" 902 << " [ports=" << ports_buf.str() << "]"
994 << " from " << port_name << "@" << name_ 903 << " from " << port_name << "@" << name_
995 << " to " << port->peer_port_name << "@" << port->peer_node_name; 904 << " to " << port->peer_port_name << "@" << port->peer_node_name;
996 #endif 905 #endif
997 906
998 GetMutableEventHeader(message)->port_name = port->peer_port_name; 907 GetMutableEventHeader(message)->port_name = port->peer_port_name;
999 return OK; 908 return OK;
1000 } 909 }
1001 910
1002 int Node::BeginProxying_Locked(Port* port, const PortName& port_name) {
1003 ports_lock_.AssertAcquired();
1004 port->lock.AssertAcquired();
1005
1006 if (port->state != Port::kBuffering)
1007 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1008
1009 port->state = Port::kProxying;
1010
1011 int rv = ForwardMessages_Locked(port, port_name);
1012 if (rv != OK)
1013 return rv;
1014
1015 // We may have observed closure while buffering. In that case, we can advance
1016 // to removing the proxy without sending out an ObserveProxy message. We
1017 // already know the last expected message, etc.
1018
1019 if (port->remove_proxy_on_last_message) {
1020 MaybeRemoveProxy_Locked(port, port_name);
1021
1022 // Make sure we propagate closure to our current peer.
1023 ObserveClosureEventData data;
1024 data.last_sequence_num = port->last_sequence_num_to_receive;
1025 delegate_->ForwardMessage(
1026 port->peer_node_name,
1027 NewInternalMessage(port->peer_port_name,
1028 EventType::kObserveClosure, data));
1029 } else {
1030 InitiateProxyRemoval_Locked(port, port_name);
1031 }
1032
1033 return OK;
1034 }
1035
1036 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { 911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
1037 ports_lock_.AssertAcquired(); 912 ports_lock_.AssertAcquired();
1038 port->lock.AssertAcquired(); 913 port->lock.AssertAcquired();
1039 914
1040 for (;;) { 915 for (;;) {
1041 ScopedMessage message; 916 ScopedMessage message;
1042 port->message_queue.GetNextMessageIf(nullptr, &message); 917 port->message_queue.GetNextMessageIf(nullptr, &message);
1043 if (!message) 918 if (!message)
1044 break; 919 break;
1045 920
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
1114 989
1115 if (num_data_bytes) 990 if (num_data_bytes)
1116 memcpy(header + 1, data, num_data_bytes); 991 memcpy(header + 1, data, num_data_bytes);
1117 992
1118 return message; 993 return message;
1119 } 994 }
1120 995
1121 } // namespace ports 996 } // namespace ports
1122 } // namespace edk 997 } // namespace edk
1123 } // namespace mojo 998 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/edk/system/ports/ports_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698