OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
(...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
347 header->port_name, | 347 header->port_name, |
348 *GetEventData<ObserveProxyEventData>(*message)); | 348 *GetEventData<ObserveProxyEventData>(*message)); |
349 case EventType::kObserveProxyAck: | 349 case EventType::kObserveProxyAck: |
350 return OnObserveProxyAck( | 350 return OnObserveProxyAck( |
351 header->port_name, | 351 header->port_name, |
352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); | 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); |
353 case EventType::kObserveClosure: | 353 case EventType::kObserveClosure: |
354 return OnObserveClosure( | 354 return OnObserveClosure( |
355 header->port_name, | 355 header->port_name, |
356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); | 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
357 case EventType::kMergePort: | |
358 return OnMergePort(header->port_name, | |
359 *GetEventData<MergePortEventData>(*message)); | |
360 } | 357 } |
361 return OOPS(ERROR_NOT_IMPLEMENTED); | 358 return OOPS(ERROR_NOT_IMPLEMENTED); |
362 } | 359 } |
363 | 360 |
364 int Node::MergePorts(const PortRef& port_ref, | |
365 const NodeName& destination_node_name, | |
366 const PortName& destination_port_name) { | |
367 Port* port = port_ref.port(); | |
368 { | |
369 // |ports_lock_| must be held for WillSendPort_Locked below. | |
370 base::AutoLock ports_lock(ports_lock_); | |
371 base::AutoLock lock(port->lock); | |
372 | |
373 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ | |
374 << " to " << destination_port_name << "@" << destination_node_name; | |
375 | |
376 // Send the port-to-merge over to the destination node so it can be merged | |
377 // into the port cycle atomically there. | |
378 MergePortEventData data; | |
379 data.new_port_name = port_ref.name(); | |
380 WillSendPort_Locked(port, destination_node_name, &data.new_port_name, | |
381 &data.new_port_descriptor); | |
382 delegate_->ForwardMessage( | |
383 destination_node_name, | |
384 NewInternalMessage(destination_port_name, | |
385 EventType::kMergePort, data)); | |
386 } | |
387 return OK; | |
388 } | |
389 | |
390 int Node::LostConnectionToNode(const NodeName& node_name) { | 361 int Node::LostConnectionToNode(const NodeName& node_name) { |
391 // We can no longer send events to the given node. We also can't expect any | 362 // We can no longer send events to the given node. We also can't expect any |
392 // PortAccepted events. | 363 // PortAccepted events. |
393 | 364 |
394 DVLOG(1) << "Observing lost connection from node " << name_ | 365 DVLOG(1) << "Observing lost connection from node " << name_ |
395 << " to node " << node_name; | 366 << " to node " << node_name; |
396 | 367 |
397 std::vector<PortRef> ports_to_notify; | 368 std::vector<PortRef> ports_to_notify; |
398 | 369 |
399 { | 370 { |
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
531 { | 502 { |
532 // We must hold |ports_lock_| before grabbing the port lock because | 503 // We must hold |ports_lock_| before grabbing the port lock because |
533 // ForwardMessages_Locked requires it to be held. | 504 // ForwardMessages_Locked requires it to be held. |
534 base::AutoLock ports_lock(ports_lock_); | 505 base::AutoLock ports_lock(ports_lock_); |
535 base::AutoLock lock(port->lock); | 506 base::AutoLock lock(port->lock); |
536 | 507 |
537 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ | 508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
538 << " pointing to " | 509 << " pointing to " |
539 << port->peer_port_name << "@" << port->peer_node_name; | 510 << port->peer_port_name << "@" << port->peer_node_name; |
540 | 511 |
541 return BeginProxying_Locked(port.get(), port_name); | 512 if (port->state != Port::kBuffering) |
| 513 return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
| 514 |
| 515 port->state = Port::kProxying; |
| 516 |
| 517 int rv = ForwardMessages_Locked(port.get(), port_name); |
| 518 if (rv != OK) |
| 519 return rv; |
| 520 |
| 521 // We may have observed closure before receiving PortAccepted. In that |
| 522 // case, we can advance to removing the proxy without sending out an |
| 523 // ObserveProxy message. We already know the last expected message, etc. |
| 524 |
| 525 if (port->remove_proxy_on_last_message) { |
| 526 MaybeRemoveProxy_Locked(port.get(), port_name); |
| 527 |
| 528 // Make sure we propagate closure to our current peer. |
| 529 ObserveClosureEventData data; |
| 530 data.last_sequence_num = port->last_sequence_num_to_receive; |
| 531 delegate_->ForwardMessage( |
| 532 port->peer_node_name, |
| 533 NewInternalMessage(port->peer_port_name, |
| 534 EventType::kObserveClosure, data)); |
| 535 } else { |
| 536 InitiateProxyRemoval_Locked(port.get(), port_name); |
| 537 } |
542 } | 538 } |
| 539 return OK; |
543 } | 540 } |
544 | 541 |
545 int Node::OnObserveProxy(const PortName& port_name, | 542 int Node::OnObserveProxy(const PortName& port_name, |
546 const ObserveProxyEventData& event) { | 543 const ObserveProxyEventData& event) { |
547 // The port may have already been closed locally, in which case the | 544 // The port may have already been closed locally, in which case the |
548 // ObserveClosure message will contain the last_sequence_num field. | 545 // ObserveClosure message will contain the last_sequence_num field. |
549 // We can then silently ignore this message. | 546 // We can then silently ignore this message. |
550 scoped_refptr<Port> port = GetPort(port_name); | 547 scoped_refptr<Port> port = GetPort(port_name); |
551 if (!port) { | 548 if (!port) { |
552 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; | 549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
717 NewInternalMessage(port->peer_port_name, | 714 NewInternalMessage(port->peer_port_name, |
718 EventType::kObserveClosure, forwarded_data)); | 715 EventType::kObserveClosure, forwarded_data)); |
719 } | 716 } |
720 if (notify_delegate) { | 717 if (notify_delegate) { |
721 PortRef port_ref(port_name, port); | 718 PortRef port_ref(port_name, port); |
722 delegate_->PortStatusChanged(port_ref); | 719 delegate_->PortStatusChanged(port_ref); |
723 } | 720 } |
724 return OK; | 721 return OK; |
725 } | 722 } |
726 | 723 |
727 int Node::OnMergePort(const PortName& port_name, | |
728 const MergePortEventData& event) { | |
729 scoped_refptr<Port> port = GetPort(port_name); | |
730 if (!port) | |
731 return ERROR_PORT_UNKNOWN; | |
732 | |
733 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" | |
734 << port->state << ") merging with proxy " << event.new_port_name | |
735 << "@" << name_ << " pointing to " | |
736 << event.new_port_descriptor.peer_port_name << "@" | |
737 << event.new_port_descriptor.peer_node_name << " referred by " | |
738 << event.new_port_descriptor.referring_port_name << "@" | |
739 << event.new_port_descriptor.referring_node_name; | |
740 | |
741 bool close_target_port = false; | |
742 bool close_new_port = false; | |
743 | |
744 // Accept the new port. This is now the receiving end of the other port cycle | |
745 // to be merged with ours. | |
746 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); | |
747 if (rv != OK) { | |
748 close_target_port = true; | |
749 } else { | |
750 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn | |
751 // needs to hold |ports_lock_|. We also acquire multiple port locks within. | |
752 base::AutoLock ports_lock(ports_lock_); | |
753 base::AutoLock lock(port->lock); | |
754 | |
755 if (port->state != Port::kReceiving) { | |
756 close_new_port = true; | |
757 } else { | |
758 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); | |
759 base::AutoLock new_port_lock(new_port->lock); | |
760 DCHECK(new_port->state == Port::kReceiving); | |
761 | |
762 // Both ports are locked. Now all we have to do is swap their peer | |
763 // information and set them up as proxies. | |
764 | |
765 std::swap(port->peer_node_name, new_port->peer_node_name); | |
766 std::swap(port->peer_port_name, new_port->peer_port_name); | |
767 std::swap(port->peer_closed, new_port->peer_closed); | |
768 | |
769 port->state = Port::kBuffering; | |
770 if (port->peer_closed) | |
771 port->remove_proxy_on_last_message = true; | |
772 | |
773 new_port->state = Port::kBuffering; | |
774 if (new_port->peer_closed) | |
775 new_port->remove_proxy_on_last_message = true; | |
776 | |
777 int rv1 = BeginProxying_Locked(port.get(), port_name); | |
778 int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); | |
779 | |
780 if (rv1 == OK && rv2 == OK) | |
781 return OK; | |
782 | |
783 // If either proxy failed to initialize (e.g. had undeliverable messages | |
784 // or ended up in a bad state somehow), we keep the system in a consistent | |
785 // state by undoing the peer swap and closing both merge ports. | |
786 | |
787 std::swap(port->peer_node_name, new_port->peer_node_name); | |
788 std::swap(port->peer_port_name, new_port->peer_port_name); | |
789 port->state = Port::kReceiving; | |
790 new_port->state = Port::kReceiving; | |
791 close_new_port = true; | |
792 close_target_port = true; | |
793 } | |
794 } | |
795 | |
796 if (close_target_port) { | |
797 PortRef target_port; | |
798 rv = GetPort(port_name, &target_port); | |
799 DCHECK(rv == OK); | |
800 | |
801 ClosePort(target_port); | |
802 } | |
803 | |
804 if (close_new_port) { | |
805 PortRef new_port; | |
806 rv = GetPort(event.new_port_name, &new_port); | |
807 DCHECK(rv == OK); | |
808 | |
809 ClosePort(new_port); | |
810 } | |
811 | |
812 return ERROR_PORT_STATE_UNEXPECTED; | |
813 } | |
814 | |
815 int Node::AddPortWithName(const PortName& port_name, | 724 int Node::AddPortWithName(const PortName& port_name, |
816 const scoped_refptr<Port>& port) { | 725 const scoped_refptr<Port>& port) { |
817 base::AutoLock lock(ports_lock_); | 726 base::AutoLock lock(ports_lock_); |
818 | 727 |
819 if (!ports_.insert(std::make_pair(port_name, port)).second) | 728 if (!ports_.insert(std::make_pair(port_name, port)).second) |
820 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. | 729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. |
821 | 730 |
822 DVLOG(2) << "Created port " << port_name << "@" << name_; | 731 DVLOG(2) << "Created port " << port_name << "@" << name_; |
823 return OK; | 732 return OK; |
824 } | 733 } |
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
992 << GetEventData<UserEventData>(*message)->sequence_num | 901 << GetEventData<UserEventData>(*message)->sequence_num |
993 << " [ports=" << ports_buf.str() << "]" | 902 << " [ports=" << ports_buf.str() << "]" |
994 << " from " << port_name << "@" << name_ | 903 << " from " << port_name << "@" << name_ |
995 << " to " << port->peer_port_name << "@" << port->peer_node_name; | 904 << " to " << port->peer_port_name << "@" << port->peer_node_name; |
996 #endif | 905 #endif |
997 | 906 |
998 GetMutableEventHeader(message)->port_name = port->peer_port_name; | 907 GetMutableEventHeader(message)->port_name = port->peer_port_name; |
999 return OK; | 908 return OK; |
1000 } | 909 } |
1001 | 910 |
1002 int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { | |
1003 ports_lock_.AssertAcquired(); | |
1004 port->lock.AssertAcquired(); | |
1005 | |
1006 if (port->state != Port::kBuffering) | |
1007 return OOPS(ERROR_PORT_STATE_UNEXPECTED); | |
1008 | |
1009 port->state = Port::kProxying; | |
1010 | |
1011 int rv = ForwardMessages_Locked(port, port_name); | |
1012 if (rv != OK) | |
1013 return rv; | |
1014 | |
1015 // We may have observed closure while buffering. In that case, we can advance | |
1016 // to removing the proxy without sending out an ObserveProxy message. We | |
1017 // already know the last expected message, etc. | |
1018 | |
1019 if (port->remove_proxy_on_last_message) { | |
1020 MaybeRemoveProxy_Locked(port, port_name); | |
1021 | |
1022 // Make sure we propagate closure to our current peer. | |
1023 ObserveClosureEventData data; | |
1024 data.last_sequence_num = port->last_sequence_num_to_receive; | |
1025 delegate_->ForwardMessage( | |
1026 port->peer_node_name, | |
1027 NewInternalMessage(port->peer_port_name, | |
1028 EventType::kObserveClosure, data)); | |
1029 } else { | |
1030 InitiateProxyRemoval_Locked(port, port_name); | |
1031 } | |
1032 | |
1033 return OK; | |
1034 } | |
1035 | |
1036 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { | 911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
1037 ports_lock_.AssertAcquired(); | 912 ports_lock_.AssertAcquired(); |
1038 port->lock.AssertAcquired(); | 913 port->lock.AssertAcquired(); |
1039 | 914 |
1040 for (;;) { | 915 for (;;) { |
1041 ScopedMessage message; | 916 ScopedMessage message; |
1042 port->message_queue.GetNextMessageIf(nullptr, &message); | 917 port->message_queue.GetNextMessageIf(nullptr, &message); |
1043 if (!message) | 918 if (!message) |
1044 break; | 919 break; |
1045 | 920 |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1114 | 989 |
1115 if (num_data_bytes) | 990 if (num_data_bytes) |
1116 memcpy(header + 1, data, num_data_bytes); | 991 memcpy(header + 1, data, num_data_bytes); |
1117 | 992 |
1118 return message; | 993 return message; |
1119 } | 994 } |
1120 | 995 |
1121 } // namespace ports | 996 } // namespace ports |
1122 } // namespace edk | 997 } // namespace edk |
1123 } // namespace mojo | 998 } // namespace mojo |
OLD | NEW |