| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
| (...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 347 header->port_name, | 347 header->port_name, |
| 348 *GetEventData<ObserveProxyEventData>(*message)); | 348 *GetEventData<ObserveProxyEventData>(*message)); |
| 349 case EventType::kObserveProxyAck: | 349 case EventType::kObserveProxyAck: |
| 350 return OnObserveProxyAck( | 350 return OnObserveProxyAck( |
| 351 header->port_name, | 351 header->port_name, |
| 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); | 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); |
| 353 case EventType::kObserveClosure: | 353 case EventType::kObserveClosure: |
| 354 return OnObserveClosure( | 354 return OnObserveClosure( |
| 355 header->port_name, | 355 header->port_name, |
| 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); | 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
| 357 case EventType::kMergePort: | |
| 358 return OnMergePort(header->port_name, | |
| 359 *GetEventData<MergePortEventData>(*message)); | |
| 360 } | 357 } |
| 361 return OOPS(ERROR_NOT_IMPLEMENTED); | 358 return OOPS(ERROR_NOT_IMPLEMENTED); |
| 362 } | 359 } |
| 363 | 360 |
| 364 int Node::MergePorts(const PortRef& port_ref, | |
| 365 const NodeName& destination_node_name, | |
| 366 const PortName& destination_port_name) { | |
| 367 Port* port = port_ref.port(); | |
| 368 { | |
| 369 // |ports_lock_| must be held for WillSendPort_Locked below. | |
| 370 base::AutoLock ports_lock(ports_lock_); | |
| 371 base::AutoLock lock(port->lock); | |
| 372 | |
| 373 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ | |
| 374 << " to " << destination_port_name << "@" << destination_node_name; | |
| 375 | |
| 376 // Send the port-to-merge over to the destination node so it can be merged | |
| 377 // into the port cycle atomically there. | |
| 378 MergePortEventData data; | |
| 379 data.new_port_name = port_ref.name(); | |
| 380 WillSendPort_Locked(port, destination_node_name, &data.new_port_name, | |
| 381 &data.new_port_descriptor); | |
| 382 delegate_->ForwardMessage( | |
| 383 destination_node_name, | |
| 384 NewInternalMessage(destination_port_name, | |
| 385 EventType::kMergePort, data)); | |
| 386 } | |
| 387 return OK; | |
| 388 } | |
| 389 | |
| 390 int Node::LostConnectionToNode(const NodeName& node_name) { | 361 int Node::LostConnectionToNode(const NodeName& node_name) { |
| 391 // We can no longer send events to the given node. We also can't expect any | 362 // We can no longer send events to the given node. We also can't expect any |
| 392 // PortAccepted events. | 363 // PortAccepted events. |
| 393 | 364 |
| 394 DVLOG(1) << "Observing lost connection from node " << name_ | 365 DVLOG(1) << "Observing lost connection from node " << name_ |
| 395 << " to node " << node_name; | 366 << " to node " << node_name; |
| 396 | 367 |
| 397 std::vector<PortRef> ports_to_notify; | 368 std::vector<PortRef> ports_to_notify; |
| 398 | 369 |
| 399 { | 370 { |
| (...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 531 { | 502 { |
| 532 // We must hold |ports_lock_| before grabbing the port lock because | 503 // We must hold |ports_lock_| before grabbing the port lock because |
| 533 // ForwardMessages_Locked requires it to be held. | 504 // ForwardMessages_Locked requires it to be held. |
| 534 base::AutoLock ports_lock(ports_lock_); | 505 base::AutoLock ports_lock(ports_lock_); |
| 535 base::AutoLock lock(port->lock); | 506 base::AutoLock lock(port->lock); |
| 536 | 507 |
| 537 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ | 508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
| 538 << " pointing to " | 509 << " pointing to " |
| 539 << port->peer_port_name << "@" << port->peer_node_name; | 510 << port->peer_port_name << "@" << port->peer_node_name; |
| 540 | 511 |
| 541 return BeginProxying_Locked(port.get(), port_name); | 512 if (port->state != Port::kBuffering) |
| 513 return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
| 514 |
| 515 port->state = Port::kProxying; |
| 516 |
| 517 int rv = ForwardMessages_Locked(port.get(), port_name); |
| 518 if (rv != OK) |
| 519 return rv; |
| 520 |
| 521 // We may have observed closure before receiving PortAccepted. In that |
| 522 // case, we can advance to removing the proxy without sending out an |
| 523 // ObserveProxy message. We already know the last expected message, etc. |
| 524 |
| 525 if (port->remove_proxy_on_last_message) { |
| 526 MaybeRemoveProxy_Locked(port.get(), port_name); |
| 527 |
| 528 // Make sure we propagate closure to our current peer. |
| 529 ObserveClosureEventData data; |
| 530 data.last_sequence_num = port->last_sequence_num_to_receive; |
| 531 delegate_->ForwardMessage( |
| 532 port->peer_node_name, |
| 533 NewInternalMessage(port->peer_port_name, |
| 534 EventType::kObserveClosure, data)); |
| 535 } else { |
| 536 InitiateProxyRemoval_Locked(port.get(), port_name); |
| 537 } |
| 542 } | 538 } |
| 539 return OK; |
| 543 } | 540 } |
| 544 | 541 |
| 545 int Node::OnObserveProxy(const PortName& port_name, | 542 int Node::OnObserveProxy(const PortName& port_name, |
| 546 const ObserveProxyEventData& event) { | 543 const ObserveProxyEventData& event) { |
| 547 // The port may have already been closed locally, in which case the | 544 // The port may have already been closed locally, in which case the |
| 548 // ObserveClosure message will contain the last_sequence_num field. | 545 // ObserveClosure message will contain the last_sequence_num field. |
| 549 // We can then silently ignore this message. | 546 // We can then silently ignore this message. |
| 550 scoped_refptr<Port> port = GetPort(port_name); | 547 scoped_refptr<Port> port = GetPort(port_name); |
| 551 if (!port) { | 548 if (!port) { |
| 552 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; | 549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
| (...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 717 NewInternalMessage(port->peer_port_name, | 714 NewInternalMessage(port->peer_port_name, |
| 718 EventType::kObserveClosure, forwarded_data)); | 715 EventType::kObserveClosure, forwarded_data)); |
| 719 } | 716 } |
| 720 if (notify_delegate) { | 717 if (notify_delegate) { |
| 721 PortRef port_ref(port_name, port); | 718 PortRef port_ref(port_name, port); |
| 722 delegate_->PortStatusChanged(port_ref); | 719 delegate_->PortStatusChanged(port_ref); |
| 723 } | 720 } |
| 724 return OK; | 721 return OK; |
| 725 } | 722 } |
| 726 | 723 |
| 727 int Node::OnMergePort(const PortName& port_name, | |
| 728 const MergePortEventData& event) { | |
| 729 scoped_refptr<Port> port = GetPort(port_name); | |
| 730 if (!port) | |
| 731 return ERROR_PORT_UNKNOWN; | |
| 732 | |
| 733 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" | |
| 734 << port->state << ") merging with proxy " << event.new_port_name | |
| 735 << "@" << name_ << " pointing to " | |
| 736 << event.new_port_descriptor.peer_port_name << "@" | |
| 737 << event.new_port_descriptor.peer_node_name << " referred by " | |
| 738 << event.new_port_descriptor.referring_port_name << "@" | |
| 739 << event.new_port_descriptor.referring_node_name; | |
| 740 | |
| 741 bool close_target_port = false; | |
| 742 bool close_new_port = false; | |
| 743 | |
| 744 // Accept the new port. This is now the receiving end of the other port cycle | |
| 745 // to be merged with ours. | |
| 746 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); | |
| 747 if (rv != OK) { | |
| 748 close_target_port = true; | |
| 749 } else { | |
| 750 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn | |
| 751 // needs to hold |ports_lock_|. We also acquire multiple port locks within. | |
| 752 base::AutoLock ports_lock(ports_lock_); | |
| 753 base::AutoLock lock(port->lock); | |
| 754 | |
| 755 if (port->state != Port::kReceiving) { | |
| 756 close_new_port = true; | |
| 757 } else { | |
| 758 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); | |
| 759 base::AutoLock new_port_lock(new_port->lock); | |
| 760 DCHECK(new_port->state == Port::kReceiving); | |
| 761 | |
| 762 // Both ports are locked. Now all we have to do is swap their peer | |
| 763 // information and set them up as proxies. | |
| 764 | |
| 765 std::swap(port->peer_node_name, new_port->peer_node_name); | |
| 766 std::swap(port->peer_port_name, new_port->peer_port_name); | |
| 767 std::swap(port->peer_closed, new_port->peer_closed); | |
| 768 | |
| 769 port->state = Port::kBuffering; | |
| 770 if (port->peer_closed) | |
| 771 port->remove_proxy_on_last_message = true; | |
| 772 | |
| 773 new_port->state = Port::kBuffering; | |
| 774 if (new_port->peer_closed) | |
| 775 new_port->remove_proxy_on_last_message = true; | |
| 776 | |
| 777 int rv1 = BeginProxying_Locked(port.get(), port_name); | |
| 778 int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); | |
| 779 | |
| 780 if (rv1 == OK && rv2 == OK) | |
| 781 return OK; | |
| 782 | |
| 783 // If either proxy failed to initialize (e.g. had undeliverable messages | |
| 784 // or ended up in a bad state somehow), we keep the system in a consistent | |
| 785 // state by undoing the peer swap and closing both merge ports. | |
| 786 | |
| 787 std::swap(port->peer_node_name, new_port->peer_node_name); | |
| 788 std::swap(port->peer_port_name, new_port->peer_port_name); | |
| 789 port->state = Port::kReceiving; | |
| 790 new_port->state = Port::kReceiving; | |
| 791 close_new_port = true; | |
| 792 close_target_port = true; | |
| 793 } | |
| 794 } | |
| 795 | |
| 796 if (close_target_port) { | |
| 797 PortRef target_port; | |
| 798 rv = GetPort(port_name, &target_port); | |
| 799 DCHECK(rv == OK); | |
| 800 | |
| 801 ClosePort(target_port); | |
| 802 } | |
| 803 | |
| 804 if (close_new_port) { | |
| 805 PortRef new_port; | |
| 806 rv = GetPort(event.new_port_name, &new_port); | |
| 807 DCHECK(rv == OK); | |
| 808 | |
| 809 ClosePort(new_port); | |
| 810 } | |
| 811 | |
| 812 return ERROR_PORT_STATE_UNEXPECTED; | |
| 813 } | |
| 814 | |
| 815 int Node::AddPortWithName(const PortName& port_name, | 724 int Node::AddPortWithName(const PortName& port_name, |
| 816 const scoped_refptr<Port>& port) { | 725 const scoped_refptr<Port>& port) { |
| 817 base::AutoLock lock(ports_lock_); | 726 base::AutoLock lock(ports_lock_); |
| 818 | 727 |
| 819 if (!ports_.insert(std::make_pair(port_name, port)).second) | 728 if (!ports_.insert(std::make_pair(port_name, port)).second) |
| 820 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. | 729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. |
| 821 | 730 |
| 822 DVLOG(2) << "Created port " << port_name << "@" << name_; | 731 DVLOG(2) << "Created port " << port_name << "@" << name_; |
| 823 return OK; | 732 return OK; |
| 824 } | 733 } |
| (...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 992 << GetEventData<UserEventData>(*message)->sequence_num | 901 << GetEventData<UserEventData>(*message)->sequence_num |
| 993 << " [ports=" << ports_buf.str() << "]" | 902 << " [ports=" << ports_buf.str() << "]" |
| 994 << " from " << port_name << "@" << name_ | 903 << " from " << port_name << "@" << name_ |
| 995 << " to " << port->peer_port_name << "@" << port->peer_node_name; | 904 << " to " << port->peer_port_name << "@" << port->peer_node_name; |
| 996 #endif | 905 #endif |
| 997 | 906 |
| 998 GetMutableEventHeader(message)->port_name = port->peer_port_name; | 907 GetMutableEventHeader(message)->port_name = port->peer_port_name; |
| 999 return OK; | 908 return OK; |
| 1000 } | 909 } |
| 1001 | 910 |
| 1002 int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { | |
| 1003 ports_lock_.AssertAcquired(); | |
| 1004 port->lock.AssertAcquired(); | |
| 1005 | |
| 1006 if (port->state != Port::kBuffering) | |
| 1007 return OOPS(ERROR_PORT_STATE_UNEXPECTED); | |
| 1008 | |
| 1009 port->state = Port::kProxying; | |
| 1010 | |
| 1011 int rv = ForwardMessages_Locked(port, port_name); | |
| 1012 if (rv != OK) | |
| 1013 return rv; | |
| 1014 | |
| 1015 // We may have observed closure while buffering. In that case, we can advance | |
| 1016 // to removing the proxy without sending out an ObserveProxy message. We | |
| 1017 // already know the last expected message, etc. | |
| 1018 | |
| 1019 if (port->remove_proxy_on_last_message) { | |
| 1020 MaybeRemoveProxy_Locked(port, port_name); | |
| 1021 | |
| 1022 // Make sure we propagate closure to our current peer. | |
| 1023 ObserveClosureEventData data; | |
| 1024 data.last_sequence_num = port->last_sequence_num_to_receive; | |
| 1025 delegate_->ForwardMessage( | |
| 1026 port->peer_node_name, | |
| 1027 NewInternalMessage(port->peer_port_name, | |
| 1028 EventType::kObserveClosure, data)); | |
| 1029 } else { | |
| 1030 InitiateProxyRemoval_Locked(port, port_name); | |
| 1031 } | |
| 1032 | |
| 1033 return OK; | |
| 1034 } | |
| 1035 | |
| 1036 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { | 911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
| 1037 ports_lock_.AssertAcquired(); | 912 ports_lock_.AssertAcquired(); |
| 1038 port->lock.AssertAcquired(); | 913 port->lock.AssertAcquired(); |
| 1039 | 914 |
| 1040 for (;;) { | 915 for (;;) { |
| 1041 ScopedMessage message; | 916 ScopedMessage message; |
| 1042 port->message_queue.GetNextMessageIf(nullptr, &message); | 917 port->message_queue.GetNextMessageIf(nullptr, &message); |
| 1043 if (!message) | 918 if (!message) |
| 1044 break; | 919 break; |
| 1045 | 920 |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1114 | 989 |
| 1115 if (num_data_bytes) | 990 if (num_data_bytes) |
| 1116 memcpy(header + 1, data, num_data_bytes); | 991 memcpy(header + 1, data, num_data_bytes); |
| 1117 | 992 |
| 1118 return message; | 993 return message; |
| 1119 } | 994 } |
| 1120 | 995 |
| 1121 } // namespace ports | 996 } // namespace ports |
| 1122 } // namespace edk | 997 } // namespace edk |
| 1123 } // namespace mojo | 998 } // namespace mojo |
| OLD | NEW |