| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
| (...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 347 header->port_name, | 347 header->port_name, |
| 348 *GetEventData<ObserveProxyEventData>(*message)); | 348 *GetEventData<ObserveProxyEventData>(*message)); |
| 349 case EventType::kObserveProxyAck: | 349 case EventType::kObserveProxyAck: |
| 350 return OnObserveProxyAck( | 350 return OnObserveProxyAck( |
| 351 header->port_name, | 351 header->port_name, |
| 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); | 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); |
| 353 case EventType::kObserveClosure: | 353 case EventType::kObserveClosure: |
| 354 return OnObserveClosure( | 354 return OnObserveClosure( |
| 355 header->port_name, | 355 header->port_name, |
| 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); | 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
| 357 case EventType::kMergePort: |
| 358 return OnMergePort(header->port_name, |
| 359 *GetEventData<MergePortEventData>(*message)); |
| 357 } | 360 } |
| 358 return OOPS(ERROR_NOT_IMPLEMENTED); | 361 return OOPS(ERROR_NOT_IMPLEMENTED); |
| 359 } | 362 } |
| 360 | 363 |
| 364 int Node::MergePorts(const PortRef& port_ref, |
| 365 const NodeName& destination_node_name, |
| 366 const PortName& destination_port_name) { |
| 367 Port* port = port_ref.port(); |
| 368 { |
| 369 // |ports_lock_| must be held for WillSendPort_Locked below. |
| 370 base::AutoLock ports_lock(ports_lock_); |
| 371 base::AutoLock lock(port->lock); |
| 372 |
| 373 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ |
| 374 << " to " << destination_port_name << "@" << destination_node_name; |
| 375 |
| 376 // Send the port-to-merge over to the destination node so it can be merged |
| 377 // into the port cycle atomically there. |
| 378 MergePortEventData data; |
| 379 data.new_port_name = port_ref.name(); |
| 380 WillSendPort_Locked(port, destination_node_name, &data.new_port_name, |
| 381 &data.new_port_descriptor); |
| 382 delegate_->ForwardMessage( |
| 383 destination_node_name, |
| 384 NewInternalMessage(destination_port_name, |
| 385 EventType::kMergePort, data)); |
| 386 } |
| 387 return OK; |
| 388 } |
| 389 |
| 361 int Node::LostConnectionToNode(const NodeName& node_name) { | 390 int Node::LostConnectionToNode(const NodeName& node_name) { |
| 362 // We can no longer send events to the given node. We also can't expect any | 391 // We can no longer send events to the given node. We also can't expect any |
| 363 // PortAccepted events. | 392 // PortAccepted events. |
| 364 | 393 |
| 365 DVLOG(1) << "Observing lost connection from node " << name_ | 394 DVLOG(1) << "Observing lost connection from node " << name_ |
| 366 << " to node " << node_name; | 395 << " to node " << node_name; |
| 367 | 396 |
| 368 std::vector<PortRef> ports_to_notify; | 397 std::vector<PortRef> ports_to_notify; |
| 369 | 398 |
| 370 { | 399 { |
| (...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 502 { | 531 { |
| 503 // We must hold |ports_lock_| before grabbing the port lock because | 532 // We must hold |ports_lock_| before grabbing the port lock because |
| 504 // ForwardMessages_Locked requires it to be held. | 533 // ForwardMessages_Locked requires it to be held. |
| 505 base::AutoLock ports_lock(ports_lock_); | 534 base::AutoLock ports_lock(ports_lock_); |
| 506 base::AutoLock lock(port->lock); | 535 base::AutoLock lock(port->lock); |
| 507 | 536 |
| 508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ | 537 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
| 509 << " pointing to " | 538 << " pointing to " |
| 510 << port->peer_port_name << "@" << port->peer_node_name; | 539 << port->peer_port_name << "@" << port->peer_node_name; |
| 511 | 540 |
| 512 if (port->state != Port::kBuffering) | 541 return BeginProxying_Locked(port.get(), port_name); |
| 513 return OOPS(ERROR_PORT_STATE_UNEXPECTED); | |
| 514 | |
| 515 port->state = Port::kProxying; | |
| 516 | |
| 517 int rv = ForwardMessages_Locked(port.get(), port_name); | |
| 518 if (rv != OK) | |
| 519 return rv; | |
| 520 | |
| 521 // We may have observed closure before receiving PortAccepted. In that | |
| 522 // case, we can advance to removing the proxy without sending out an | |
| 523 // ObserveProxy message. We already know the last expected message, etc. | |
| 524 | |
| 525 if (port->remove_proxy_on_last_message) { | |
| 526 MaybeRemoveProxy_Locked(port.get(), port_name); | |
| 527 | |
| 528 // Make sure we propagate closure to our current peer. | |
| 529 ObserveClosureEventData data; | |
| 530 data.last_sequence_num = port->last_sequence_num_to_receive; | |
| 531 delegate_->ForwardMessage( | |
| 532 port->peer_node_name, | |
| 533 NewInternalMessage(port->peer_port_name, | |
| 534 EventType::kObserveClosure, data)); | |
| 535 } else { | |
| 536 InitiateProxyRemoval_Locked(port.get(), port_name); | |
| 537 } | |
| 538 } | 542 } |
| 539 return OK; | |
| 540 } | 543 } |
| 541 | 544 |
| 542 int Node::OnObserveProxy(const PortName& port_name, | 545 int Node::OnObserveProxy(const PortName& port_name, |
| 543 const ObserveProxyEventData& event) { | 546 const ObserveProxyEventData& event) { |
| 544 // The port may have already been closed locally, in which case the | 547 // The port may have already been closed locally, in which case the |
| 545 // ObserveClosure message will contain the last_sequence_num field. | 548 // ObserveClosure message will contain the last_sequence_num field. |
| 546 // We can then silently ignore this message. | 549 // We can then silently ignore this message. |
| 547 scoped_refptr<Port> port = GetPort(port_name); | 550 scoped_refptr<Port> port = GetPort(port_name); |
| 548 if (!port) { | 551 if (!port) { |
| 549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; | 552 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
| (...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 714 NewInternalMessage(port->peer_port_name, | 717 NewInternalMessage(port->peer_port_name, |
| 715 EventType::kObserveClosure, forwarded_data)); | 718 EventType::kObserveClosure, forwarded_data)); |
| 716 } | 719 } |
| 717 if (notify_delegate) { | 720 if (notify_delegate) { |
| 718 PortRef port_ref(port_name, port); | 721 PortRef port_ref(port_name, port); |
| 719 delegate_->PortStatusChanged(port_ref); | 722 delegate_->PortStatusChanged(port_ref); |
| 720 } | 723 } |
| 721 return OK; | 724 return OK; |
| 722 } | 725 } |
| 723 | 726 |
| 727 int Node::OnMergePort(const PortName& port_name, |
| 728 const MergePortEventData& event) { |
| 729 scoped_refptr<Port> port = GetPort(port_name); |
| 730 if (!port) |
| 731 return ERROR_PORT_UNKNOWN; |
| 732 |
| 733 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" |
| 734 << port->state << ") merging with proxy " << event.new_port_name |
| 735 << "@" << name_ << " pointing to " |
| 736 << event.new_port_descriptor.peer_port_name << "@" |
| 737 << event.new_port_descriptor.peer_node_name << " referred by " |
| 738 << event.new_port_descriptor.referring_port_name << "@" |
| 739 << event.new_port_descriptor.referring_node_name; |
| 740 |
| 741 bool close_target_port = false; |
| 742 bool close_new_port = false; |
| 743 |
| 744 // Accept the new port. This is now the receiving end of the other port cycle |
| 745 // to be merged with ours. |
| 746 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); |
| 747 if (rv != OK) { |
| 748 close_target_port = true; |
| 749 } else { |
| 750 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn |
| 751 // needs to hold |ports_lock_|. We also acquire multiple port locks within. |
| 752 base::AutoLock ports_lock(ports_lock_); |
| 753 base::AutoLock lock(port->lock); |
| 754 |
| 755 if (port->state != Port::kReceiving) { |
| 756 close_new_port = true; |
| 757 } else { |
| 758 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); |
| 759 base::AutoLock new_port_lock(new_port->lock); |
| 760 DCHECK(new_port->state == Port::kReceiving); |
| 761 |
| 762 // Both ports are locked. Now all we have to do is swap their peer |
| 763 // information and set them up as proxies. |
| 764 |
| 765 std::swap(port->peer_node_name, new_port->peer_node_name); |
| 766 std::swap(port->peer_port_name, new_port->peer_port_name); |
| 767 std::swap(port->peer_closed, new_port->peer_closed); |
| 768 |
| 769 port->state = Port::kBuffering; |
| 770 if (port->peer_closed) |
| 771 port->remove_proxy_on_last_message = true; |
| 772 |
| 773 new_port->state = Port::kBuffering; |
| 774 if (new_port->peer_closed) |
| 775 new_port->remove_proxy_on_last_message = true; |
| 776 |
| 777 int rv1 = BeginProxying_Locked(port.get(), port_name); |
| 778 int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); |
| 779 |
| 780 if (rv1 == OK && rv2 == OK) |
| 781 return OK; |
| 782 |
| 783 // If either proxy failed to initialize (e.g. had undeliverable messages |
| 784 // or ended up in a bad state somehow), we keep the system in a consistent |
| 785 // state by undoing the peer swap and closing both merge ports. |
| 786 |
| 787 std::swap(port->peer_node_name, new_port->peer_node_name); |
| 788 std::swap(port->peer_port_name, new_port->peer_port_name); |
| 789 port->state = Port::kReceiving; |
| 790 new_port->state = Port::kReceiving; |
| 791 close_new_port = true; |
| 792 close_target_port = true; |
| 793 } |
| 794 } |
| 795 |
| 796 if (close_target_port) { |
| 797 PortRef target_port; |
| 798 rv = GetPort(port_name, &target_port); |
| 799 DCHECK(rv == OK); |
| 800 |
| 801 ClosePort(target_port); |
| 802 } |
| 803 |
| 804 if (close_new_port) { |
| 805 PortRef new_port; |
| 806 rv = GetPort(event.new_port_name, &new_port); |
| 807 DCHECK(rv == OK); |
| 808 |
| 809 ClosePort(new_port); |
| 810 } |
| 811 |
| 812 return ERROR_PORT_STATE_UNEXPECTED; |
| 813 } |
| 814 |
| 724 int Node::AddPortWithName(const PortName& port_name, | 815 int Node::AddPortWithName(const PortName& port_name, |
| 725 const scoped_refptr<Port>& port) { | 816 const scoped_refptr<Port>& port) { |
| 726 base::AutoLock lock(ports_lock_); | 817 base::AutoLock lock(ports_lock_); |
| 727 | 818 |
| 728 if (!ports_.insert(std::make_pair(port_name, port)).second) | 819 if (!ports_.insert(std::make_pair(port_name, port)).second) |
| 729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. | 820 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. |
| 730 | 821 |
| 731 DVLOG(2) << "Created port " << port_name << "@" << name_; | 822 DVLOG(2) << "Created port " << port_name << "@" << name_; |
| 732 return OK; | 823 return OK; |
| 733 } | 824 } |
| (...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 901 << GetEventData<UserEventData>(*message)->sequence_num | 992 << GetEventData<UserEventData>(*message)->sequence_num |
| 902 << " [ports=" << ports_buf.str() << "]" | 993 << " [ports=" << ports_buf.str() << "]" |
| 903 << " from " << port_name << "@" << name_ | 994 << " from " << port_name << "@" << name_ |
| 904 << " to " << port->peer_port_name << "@" << port->peer_node_name; | 995 << " to " << port->peer_port_name << "@" << port->peer_node_name; |
| 905 #endif | 996 #endif |
| 906 | 997 |
| 907 GetMutableEventHeader(message)->port_name = port->peer_port_name; | 998 GetMutableEventHeader(message)->port_name = port->peer_port_name; |
| 908 return OK; | 999 return OK; |
| 909 } | 1000 } |
| 910 | 1001 |
| 1002 int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { |
| 1003 ports_lock_.AssertAcquired(); |
| 1004 port->lock.AssertAcquired(); |
| 1005 |
| 1006 if (port->state != Port::kBuffering) |
| 1007 return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
| 1008 |
| 1009 port->state = Port::kProxying; |
| 1010 |
| 1011 int rv = ForwardMessages_Locked(port, port_name); |
| 1012 if (rv != OK) |
| 1013 return rv; |
| 1014 |
| 1015 // We may have observed closure while buffering. In that case, we can advance |
| 1016 // to removing the proxy without sending out an ObserveProxy message. We |
| 1017 // already know the last expected message, etc. |
| 1018 |
| 1019 if (port->remove_proxy_on_last_message) { |
| 1020 MaybeRemoveProxy_Locked(port, port_name); |
| 1021 |
| 1022 // Make sure we propagate closure to our current peer. |
| 1023 ObserveClosureEventData data; |
| 1024 data.last_sequence_num = port->last_sequence_num_to_receive; |
| 1025 delegate_->ForwardMessage( |
| 1026 port->peer_node_name, |
| 1027 NewInternalMessage(port->peer_port_name, |
| 1028 EventType::kObserveClosure, data)); |
| 1029 } else { |
| 1030 InitiateProxyRemoval_Locked(port, port_name); |
| 1031 } |
| 1032 |
| 1033 return OK; |
| 1034 } |
| 1035 |
| 911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { | 1036 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
| 912 ports_lock_.AssertAcquired(); | 1037 ports_lock_.AssertAcquired(); |
| 913 port->lock.AssertAcquired(); | 1038 port->lock.AssertAcquired(); |
| 914 | 1039 |
| 915 for (;;) { | 1040 for (;;) { |
| 916 ScopedMessage message; | 1041 ScopedMessage message; |
| 917 port->message_queue.GetNextMessageIf(nullptr, &message); | 1042 port->message_queue.GetNextMessageIf(nullptr, &message); |
| 918 if (!message) | 1043 if (!message) |
| 919 break; | 1044 break; |
| 920 | 1045 |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 989 | 1114 |
| 990 if (num_data_bytes) | 1115 if (num_data_bytes) |
| 991 memcpy(header + 1, data, num_data_bytes); | 1116 memcpy(header + 1, data, num_data_bytes); |
| 992 | 1117 |
| 993 return message; | 1118 return message; |
| 994 } | 1119 } |
| 995 | 1120 |
| 996 } // namespace ports | 1121 } // namespace ports |
| 997 } // namespace edk | 1122 } // namespace edk |
| 998 } // namespace mojo | 1123 } // namespace mojo |
| OLD | NEW |