Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(539)

Side by Side Diff: mojo/edk/system/ports/node.cc

Issue 1675603002: [mojo-edk] Simplify multiprocess pipe bootstrap (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix some callers to work with sync APIs Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/edk/system/ports/ports_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/ports/node.h" 5 #include "mojo/edk/system/ports/node.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
(...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after
347 header->port_name, 347 header->port_name,
348 *GetEventData<ObserveProxyEventData>(*message)); 348 *GetEventData<ObserveProxyEventData>(*message));
349 case EventType::kObserveProxyAck: 349 case EventType::kObserveProxyAck:
350 return OnObserveProxyAck( 350 return OnObserveProxyAck(
351 header->port_name, 351 header->port_name,
352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
353 case EventType::kObserveClosure: 353 case EventType::kObserveClosure:
354 return OnObserveClosure( 354 return OnObserveClosure(
355 header->port_name, 355 header->port_name,
356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
357 case EventType::kMergePort:
358 return OnMergePort(header->port_name,
359 *GetEventData<MergePortEventData>(*message));
357 } 360 }
358 return OOPS(ERROR_NOT_IMPLEMENTED); 361 return OOPS(ERROR_NOT_IMPLEMENTED);
359 } 362 }
360 363
364 int Node::MergePorts(const PortRef& port_ref,
365 const NodeName& destination_node_name,
366 const PortName& destination_port_name) {
367 Port* port = port_ref.port();
368 {
369 // |ports_lock_| must be held for WillSendPort_Locked below.
370 base::AutoLock ports_lock(ports_lock_);
371 base::AutoLock lock(port->lock);
372
373 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
374 << " to " << destination_port_name << "@" << destination_node_name;
375
376 // Send the port-to-merge over to the destination node so it can be merged
377 // into the port cycle atomically there.
378 MergePortEventData data;
379 data.new_port_name = port_ref.name();
380 WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
381 &data.new_port_descriptor);
382 delegate_->ForwardMessage(
383 destination_node_name,
384 NewInternalMessage(destination_port_name,
385 EventType::kMergePort, data));
386 }
387 return OK;
388 }
389
361 int Node::LostConnectionToNode(const NodeName& node_name) { 390 int Node::LostConnectionToNode(const NodeName& node_name) {
362 // We can no longer send events to the given node. We also can't expect any 391 // We can no longer send events to the given node. We also can't expect any
363 // PortAccepted events. 392 // PortAccepted events.
364 393
365 DVLOG(1) << "Observing lost connection from node " << name_ 394 DVLOG(1) << "Observing lost connection from node " << name_
366 << " to node " << node_name; 395 << " to node " << node_name;
367 396
368 std::vector<PortRef> ports_to_notify; 397 std::vector<PortRef> ports_to_notify;
369 398
370 { 399 {
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
502 { 531 {
503 // We must hold |ports_lock_| before grabbing the port lock because 532 // We must hold |ports_lock_| before grabbing the port lock because
504 // ForwardMessages_Locked requires it to be held. 533 // ForwardMessages_Locked requires it to be held.
505 base::AutoLock ports_lock(ports_lock_); 534 base::AutoLock ports_lock(ports_lock_);
506 base::AutoLock lock(port->lock); 535 base::AutoLock lock(port->lock);
507 536
508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ 537 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
509 << " pointing to " 538 << " pointing to "
510 << port->peer_port_name << "@" << port->peer_node_name; 539 << port->peer_port_name << "@" << port->peer_node_name;
511 540
512 if (port->state != Port::kBuffering) 541 return BeginProxying_Locked(port.get(), port_name);
513 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
514
515 port->state = Port::kProxying;
516
517 int rv = ForwardMessages_Locked(port.get(), port_name);
518 if (rv != OK)
519 return rv;
520
521 // We may have observed closure before receiving PortAccepted. In that
522 // case, we can advance to removing the proxy without sending out an
523 // ObserveProxy message. We already know the last expected message, etc.
524
525 if (port->remove_proxy_on_last_message) {
526 MaybeRemoveProxy_Locked(port.get(), port_name);
527
528 // Make sure we propagate closure to our current peer.
529 ObserveClosureEventData data;
530 data.last_sequence_num = port->last_sequence_num_to_receive;
531 delegate_->ForwardMessage(
532 port->peer_node_name,
533 NewInternalMessage(port->peer_port_name,
534 EventType::kObserveClosure, data));
535 } else {
536 InitiateProxyRemoval_Locked(port.get(), port_name);
537 }
538 } 542 }
539 return OK;
540 } 543 }
541 544
542 int Node::OnObserveProxy(const PortName& port_name, 545 int Node::OnObserveProxy(const PortName& port_name,
543 const ObserveProxyEventData& event) { 546 const ObserveProxyEventData& event) {
544 // The port may have already been closed locally, in which case the 547 // The port may have already been closed locally, in which case the
545 // ObserveClosure message will contain the last_sequence_num field. 548 // ObserveClosure message will contain the last_sequence_num field.
546 // We can then silently ignore this message. 549 // We can then silently ignore this message.
547 scoped_refptr<Port> port = GetPort(port_name); 550 scoped_refptr<Port> port = GetPort(port_name);
548 if (!port) { 551 if (!port) {
549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; 552 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after
714 NewInternalMessage(port->peer_port_name, 717 NewInternalMessage(port->peer_port_name,
715 EventType::kObserveClosure, forwarded_data)); 718 EventType::kObserveClosure, forwarded_data));
716 } 719 }
717 if (notify_delegate) { 720 if (notify_delegate) {
718 PortRef port_ref(port_name, port); 721 PortRef port_ref(port_name, port);
719 delegate_->PortStatusChanged(port_ref); 722 delegate_->PortStatusChanged(port_ref);
720 } 723 }
721 return OK; 724 return OK;
722 } 725 }
723 726
727 int Node::OnMergePort(const PortName& port_name,
728 const MergePortEventData& event) {
729 scoped_refptr<Port> port = GetPort(port_name);
730 if (!port)
731 return ERROR_PORT_UNKNOWN;
732
733 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
734 << port->state << ") merging with proxy " << event.new_port_name
735 << "@" << name_ << " pointing to "
736 << event.new_port_descriptor.peer_port_name << "@"
737 << event.new_port_descriptor.peer_node_name << " referred by "
738 << event.new_port_descriptor.referring_port_name << "@"
739 << event.new_port_descriptor.referring_node_name;
740
741 bool close_target_port = false;
742 bool close_new_port = false;
743
744 // Accept the new port. This is now the receiving end of the other port cycle
745 // to be merged with ours.
746 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
747 if (rv != OK) {
748 close_target_port = true;
749 } else {
750 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
751 // needs to hold |ports_lock_|. We also acquire multiple port locks within.
752 base::AutoLock ports_lock(ports_lock_);
753 base::AutoLock lock(port->lock);
754
755 if (port->state != Port::kReceiving) {
756 close_new_port = true;
757 } else {
758 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
759 base::AutoLock new_port_lock(new_port->lock);
760 DCHECK(new_port->state == Port::kReceiving);
761
762 // Both ports are locked. Now all we have to do is swap their peer
763 // information and set them up as proxies.
764
765 std::swap(port->peer_node_name, new_port->peer_node_name);
766 std::swap(port->peer_port_name, new_port->peer_port_name);
767 std::swap(port->peer_closed, new_port->peer_closed);
768
769 port->state = Port::kBuffering;
770 if (port->peer_closed)
771 port->remove_proxy_on_last_message = true;
772
773 new_port->state = Port::kBuffering;
774 if (new_port->peer_closed)
775 new_port->remove_proxy_on_last_message = true;
776
777 int rv1 = BeginProxying_Locked(port.get(), port_name);
778 int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
779
780 if (rv1 == OK && rv2 == OK)
781 return OK;
782
783 // If either proxy failed to initialize (e.g. had undeliverable messages
784 // or ended up in a bad state somehow), we keep the system in a consistent
785 // state by undoing the peer swap and closing both merge ports.
786
787 std::swap(port->peer_node_name, new_port->peer_node_name);
788 std::swap(port->peer_port_name, new_port->peer_port_name);
789 port->state = Port::kReceiving;
790 new_port->state = Port::kReceiving;
791 close_new_port = true;
792 close_target_port = true;
793 }
794 }
795
796 if (close_target_port) {
797 PortRef target_port;
798 rv = GetPort(port_name, &target_port);
799 DCHECK(rv == OK);
800
801 ClosePort(target_port);
802 }
803
804 if (close_new_port) {
805 PortRef new_port;
806 rv = GetPort(event.new_port_name, &new_port);
807 DCHECK(rv == OK);
808
809 ClosePort(new_port);
810 }
811
812 return ERROR_PORT_STATE_UNEXPECTED;
813 }
814
724 int Node::AddPortWithName(const PortName& port_name, 815 int Node::AddPortWithName(const PortName& port_name,
725 const scoped_refptr<Port>& port) { 816 const scoped_refptr<Port>& port) {
726 base::AutoLock lock(ports_lock_); 817 base::AutoLock lock(ports_lock_);
727 818
728 if (!ports_.insert(std::make_pair(port_name, port)).second) 819 if (!ports_.insert(std::make_pair(port_name, port)).second)
729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. 820 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
730 821
731 DVLOG(2) << "Created port " << port_name << "@" << name_; 822 DVLOG(2) << "Created port " << port_name << "@" << name_;
732 return OK; 823 return OK;
733 } 824 }
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after
901 << GetEventData<UserEventData>(*message)->sequence_num 992 << GetEventData<UserEventData>(*message)->sequence_num
902 << " [ports=" << ports_buf.str() << "]" 993 << " [ports=" << ports_buf.str() << "]"
903 << " from " << port_name << "@" << name_ 994 << " from " << port_name << "@" << name_
904 << " to " << port->peer_port_name << "@" << port->peer_node_name; 995 << " to " << port->peer_port_name << "@" << port->peer_node_name;
905 #endif 996 #endif
906 997
907 GetMutableEventHeader(message)->port_name = port->peer_port_name; 998 GetMutableEventHeader(message)->port_name = port->peer_port_name;
908 return OK; 999 return OK;
909 } 1000 }
910 1001
1002 int Node::BeginProxying_Locked(Port* port, const PortName& port_name) {
1003 ports_lock_.AssertAcquired();
1004 port->lock.AssertAcquired();
1005
1006 if (port->state != Port::kBuffering)
1007 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1008
1009 port->state = Port::kProxying;
1010
1011 int rv = ForwardMessages_Locked(port, port_name);
1012 if (rv != OK)
1013 return rv;
1014
1015 // We may have observed closure while buffering. In that case, we can advance
1016 // to removing the proxy without sending out an ObserveProxy message. We
1017 // already know the last expected message, etc.
1018
1019 if (port->remove_proxy_on_last_message) {
1020 MaybeRemoveProxy_Locked(port, port_name);
1021
1022 // Make sure we propagate closure to our current peer.
1023 ObserveClosureEventData data;
1024 data.last_sequence_num = port->last_sequence_num_to_receive;
1025 delegate_->ForwardMessage(
1026 port->peer_node_name,
1027 NewInternalMessage(port->peer_port_name,
1028 EventType::kObserveClosure, data));
1029 } else {
1030 InitiateProxyRemoval_Locked(port, port_name);
1031 }
1032
1033 return OK;
1034 }
1035
911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { 1036 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
912 ports_lock_.AssertAcquired(); 1037 ports_lock_.AssertAcquired();
913 port->lock.AssertAcquired(); 1038 port->lock.AssertAcquired();
914 1039
915 for (;;) { 1040 for (;;) {
916 ScopedMessage message; 1041 ScopedMessage message;
917 port->message_queue.GetNextMessageIf(nullptr, &message); 1042 port->message_queue.GetNextMessageIf(nullptr, &message);
918 if (!message) 1043 if (!message)
919 break; 1044 break;
920 1045
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
989 1114
990 if (num_data_bytes) 1115 if (num_data_bytes)
991 memcpy(header + 1, data, num_data_bytes); 1116 memcpy(header + 1, data, num_data_bytes);
992 1117
993 return message; 1118 return message;
994 } 1119 }
995 1120
996 } // namespace ports 1121 } // namespace ports
997 } // namespace edk 1122 } // namespace edk
998 } // namespace mojo 1123 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/edk/system/ports/ports_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698