Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(49)

Side by Side Diff: mojo/edk/system/ports/node.cc

Issue 1675603002: [mojo-edk] Simplify multiprocess pipe bootstrap (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/ports/node.h" 5 #include "mojo/edk/system/ports/node.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
(...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after
347 header->port_name, 347 header->port_name,
348 *GetEventData<ObserveProxyEventData>(*message)); 348 *GetEventData<ObserveProxyEventData>(*message));
349 case EventType::kObserveProxyAck: 349 case EventType::kObserveProxyAck:
350 return OnObserveProxyAck( 350 return OnObserveProxyAck(
351 header->port_name, 351 header->port_name,
352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
353 case EventType::kObserveClosure: 353 case EventType::kObserveClosure:
354 return OnObserveClosure( 354 return OnObserveClosure(
355 header->port_name, 355 header->port_name,
356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
357 case EventType::kMergePort:
358 return OnMergePort(header->port_name,
359 *GetEventData<MergePortEventData>(*message));
357 } 360 }
358 return OOPS(ERROR_NOT_IMPLEMENTED); 361 return OOPS(ERROR_NOT_IMPLEMENTED);
359 } 362 }
360 363
364 int Node::MergePorts(const PortRef& port_ref,
365 const NodeName& destination_node_name,
366 const PortName& destination_port_name) {
367 Port* port = port_ref.port();
368 {
369 // |ports_lock_| must be held for WillSendPort_Locked below.
370 base::AutoLock ports_lock(ports_lock_);
371 base::AutoLock lock(port->lock);
372
373 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
374 << " to " << destination_port_name << "@" << destination_node_name;
375
376 // Send the port-to-merge over to the destination node so it can be merged
377 // into the port cycle atomically there.
378 MergePortEventData data;
379 data.new_port_name = port_ref.name();
380 WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
381 &data.new_port_descriptor);
382 delegate_->ForwardMessage(
383 destination_node_name,
384 NewInternalMessage(destination_port_name,
385 EventType::kMergePort, data));
386 }
387 return OK;
388 }
389
361 int Node::LostConnectionToNode(const NodeName& node_name) { 390 int Node::LostConnectionToNode(const NodeName& node_name) {
362 // We can no longer send events to the given node. We also can't expect any 391 // We can no longer send events to the given node. We also can't expect any
363 // PortAccepted events. 392 // PortAccepted events.
364 393
365 DVLOG(1) << "Observing lost connection from node " << name_ 394 DVLOG(1) << "Observing lost connection from node " << name_
366 << " to node " << node_name; 395 << " to node " << node_name;
367 396
368 std::vector<PortRef> ports_to_notify; 397 std::vector<PortRef> ports_to_notify;
369 398
370 { 399 {
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
502 { 531 {
503 // We must hold |ports_lock_| before grabbing the port lock because 532 // We must hold |ports_lock_| before grabbing the port lock because
504 // ForwardMessages_Locked requires it to be held. 533 // ForwardMessages_Locked requires it to be held.
505 base::AutoLock ports_lock(ports_lock_); 534 base::AutoLock ports_lock(ports_lock_);
506 base::AutoLock lock(port->lock); 535 base::AutoLock lock(port->lock);
507 536
508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ 537 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
509 << " pointing to " 538 << " pointing to "
510 << port->peer_port_name << "@" << port->peer_node_name; 539 << port->peer_port_name << "@" << port->peer_node_name;
511 540
512 if (port->state != Port::kBuffering) 541 return BeginProxying_Locked(port.get(), port_name);
513 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
514
515 port->state = Port::kProxying;
516
517 int rv = ForwardMessages_Locked(port.get(), port_name);
518 if (rv != OK)
519 return rv;
520
521 // We may have observed closure before receiving PortAccepted. In that
522 // case, we can advance to removing the proxy without sending out an
523 // ObserveProxy message. We already know the last expected message, etc.
524
525 if (port->remove_proxy_on_last_message) {
526 MaybeRemoveProxy_Locked(port.get(), port_name);
527
528 // Make sure we propagate closure to our current peer.
529 ObserveClosureEventData data;
530 data.last_sequence_num = port->last_sequence_num_to_receive;
531 delegate_->ForwardMessage(
532 port->peer_node_name,
533 NewInternalMessage(port->peer_port_name,
534 EventType::kObserveClosure, data));
535 } else {
536 InitiateProxyRemoval_Locked(port.get(), port_name);
537 }
538 } 542 }
539 return OK;
540 } 543 }
541 544
542 int Node::OnObserveProxy(const PortName& port_name, 545 int Node::OnObserveProxy(const PortName& port_name,
543 const ObserveProxyEventData& event) { 546 const ObserveProxyEventData& event) {
544 // The port may have already been closed locally, in which case the 547 // The port may have already been closed locally, in which case the
545 // ObserveClosure message will contain the last_sequence_num field. 548 // ObserveClosure message will contain the last_sequence_num field.
546 // We can then silently ignore this message. 549 // We can then silently ignore this message.
547 scoped_refptr<Port> port = GetPort(port_name); 550 scoped_refptr<Port> port = GetPort(port_name);
548 if (!port) { 551 if (!port) {
549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; 552 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after
714 NewInternalMessage(port->peer_port_name, 717 NewInternalMessage(port->peer_port_name,
715 EventType::kObserveClosure, forwarded_data)); 718 EventType::kObserveClosure, forwarded_data));
716 } 719 }
717 if (notify_delegate) { 720 if (notify_delegate) {
718 PortRef port_ref(port_name, port); 721 PortRef port_ref(port_name, port);
719 delegate_->PortStatusChanged(port_ref); 722 delegate_->PortStatusChanged(port_ref);
720 } 723 }
721 return OK; 724 return OK;
722 } 725 }
723 726
727 int Node::OnMergePort(const PortName& port_name,
728 const MergePortEventData& event) {
729 scoped_refptr<Port> port = GetPort(port_name);
730 if (!port)
731 return ERROR_PORT_UNKNOWN;
732
733 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
734 << port->state << ") merging with proxy " << event.new_port_name
735 << "@" << name_ << " pointing to "
736 << event.new_port_descriptor.peer_port_name << "@"
737 << event.new_port_descriptor.peer_node_name << " referred by "
738 << event.new_port_descriptor.referring_port_name << "@"
739 << event.new_port_descriptor.referring_node_name;
740
741 // Accept the new port. This is now the receiving end of the other port cycle
742 // to be merged with ours.
743 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
744 if (rv != OK)
745 return rv;
746
747 {
748 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
749 // needs to hold |ports_lock_|. We also acquire multiple port locks within.
750 base::AutoLock ports_lock(ports_lock_);
751
752 base::AutoLock lock(port->lock);
753
754 if (port->state != Port::kReceiving)
755 return ERROR_PORT_STATE_UNEXPECTED;
756
757 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
darin (slow to review) 2016/02/05 17:46:45 should we make some assertions about the state of
Ken Rockot(use gerrit already) 2016/02/05 19:43:05 done
758 base::AutoLock new_port_lock(new_port->lock);
759
760 // Both ports are locked. Now all we have to do is swap their peer
761 // information and set them up as proxies.
762
763 std::swap(port->peer_node_name, new_port->peer_node_name);
764 std::swap(port->peer_port_name, new_port->peer_port_name);
765
766 port->state = Port::kBuffering;
767 if (port->peer_closed)
768 port->remove_proxy_on_last_message = true;
769
770 new_port->state = Port::kBuffering;
771 if (new_port->peer_closed)
772 new_port->remove_proxy_on_last_message = true;
773
774 rv = BeginProxying_Locked(port.get(), port_name);
775 if (rv != OK)
776 return rv;
darin (slow to review) 2016/02/05 17:46:45 Are we in a bad state potentially if we early retu
Ken Rockot(use gerrit already) 2016/02/05 19:43:05 Yeah, I kinda glossed over this before. Changed it
777
778 return BeginProxying_Locked(new_port.get(), event.new_port_name);
779 }
780 }
781
724 int Node::AddPortWithName(const PortName& port_name, 782 int Node::AddPortWithName(const PortName& port_name,
725 const scoped_refptr<Port>& port) { 783 const scoped_refptr<Port>& port) {
726 base::AutoLock lock(ports_lock_); 784 base::AutoLock lock(ports_lock_);
727 785
728 if (!ports_.insert(std::make_pair(port_name, port)).second) 786 if (!ports_.insert(std::make_pair(port_name, port)).second)
729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. 787 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
730 788
731 DVLOG(2) << "Created port " << port_name << "@" << name_; 789 DVLOG(2) << "Created port " << port_name << "@" << name_;
732 return OK; 790 return OK;
733 } 791 }
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after
901 << GetEventData<UserEventData>(*message)->sequence_num 959 << GetEventData<UserEventData>(*message)->sequence_num
902 << " [ports=" << ports_buf.str() << "]" 960 << " [ports=" << ports_buf.str() << "]"
903 << " from " << port_name << "@" << name_ 961 << " from " << port_name << "@" << name_
904 << " to " << port->peer_port_name << "@" << port->peer_node_name; 962 << " to " << port->peer_port_name << "@" << port->peer_node_name;
905 #endif 963 #endif
906 964
907 GetMutableEventHeader(message)->port_name = port->peer_port_name; 965 GetMutableEventHeader(message)->port_name = port->peer_port_name;
908 return OK; 966 return OK;
909 } 967 }
910 968
969 int Node::BeginProxying_Locked(Port* port, const PortName& port_name) {
970 ports_lock_.AssertAcquired();
971 port->lock.AssertAcquired();
972
973 if (port->state != Port::kBuffering)
974 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
975
976 port->state = Port::kProxying;
977
978 int rv = ForwardMessages_Locked(port, port_name);
979 if (rv != OK)
980 return rv;
981
982 // We may have observed closure while buffering. In that case, we can advance
983 // to removing the proxy without sending out an ObserveProxy message. We
984 // already know the last expected message, etc.
985
986 if (port->remove_proxy_on_last_message) {
987 MaybeRemoveProxy_Locked(port, port_name);
988
989 // Make sure we propagate closure to our current peer.
990 ObserveClosureEventData data;
991 data.last_sequence_num = port->last_sequence_num_to_receive;
992 delegate_->ForwardMessage(
993 port->peer_node_name,
994 NewInternalMessage(port->peer_port_name,
995 EventType::kObserveClosure, data));
996 } else {
997 InitiateProxyRemoval_Locked(port, port_name);
998 }
999
1000 return OK;
1001 }
1002
911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { 1003 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
912 ports_lock_.AssertAcquired(); 1004 ports_lock_.AssertAcquired();
913 port->lock.AssertAcquired(); 1005 port->lock.AssertAcquired();
914 1006
915 for (;;) { 1007 for (;;) {
916 ScopedMessage message; 1008 ScopedMessage message;
917 port->message_queue.GetNextMessageIf(nullptr, &message); 1009 port->message_queue.GetNextMessageIf(nullptr, &message);
918 if (!message) 1010 if (!message)
919 break; 1011 break;
920 1012
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
989 1081
990 if (num_data_bytes) 1082 if (num_data_bytes)
991 memcpy(header + 1, data, num_data_bytes); 1083 memcpy(header + 1, data, num_data_bytes);
992 1084
993 return message; 1085 return message;
994 } 1086 }
995 1087
996 } // namespace ports 1088 } // namespace ports
997 } // namespace edk 1089 } // namespace edk
998 } // namespace mojo 1090 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698