OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
(...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
347 header->port_name, | 347 header->port_name, |
348 *GetEventData<ObserveProxyEventData>(*message)); | 348 *GetEventData<ObserveProxyEventData>(*message)); |
349 case EventType::kObserveProxyAck: | 349 case EventType::kObserveProxyAck: |
350 return OnObserveProxyAck( | 350 return OnObserveProxyAck( |
351 header->port_name, | 351 header->port_name, |
352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); | 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); |
353 case EventType::kObserveClosure: | 353 case EventType::kObserveClosure: |
354 return OnObserveClosure( | 354 return OnObserveClosure( |
355 header->port_name, | 355 header->port_name, |
356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); | 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
| 357 case EventType::kMergePort: |
| 358 return OnMergePort(header->port_name, |
| 359 *GetEventData<MergePortEventData>(*message)); |
357 } | 360 } |
358 return OOPS(ERROR_NOT_IMPLEMENTED); | 361 return OOPS(ERROR_NOT_IMPLEMENTED); |
359 } | 362 } |
360 | 363 |
| 364 int Node::MergePorts(const PortRef& port_ref, |
| 365 const NodeName& destination_node_name, |
| 366 const PortName& destination_port_name) { |
| 367 Port* port = port_ref.port(); |
| 368 { |
| 369 // |ports_lock_| must be held for WillSendPort_Locked below. |
| 370 base::AutoLock ports_lock(ports_lock_); |
| 371 base::AutoLock lock(port->lock); |
| 372 |
| 373 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ |
| 374 << " to " << destination_port_name << "@" << destination_node_name; |
| 375 |
| 376 // Send the port-to-merge over to the destination node so it can be merged |
| 377 // into the port cycle atomically there. |
| 378 MergePortEventData data; |
| 379 data.new_port_name = port_ref.name(); |
| 380 WillSendPort_Locked(port, destination_node_name, &data.new_port_name, |
| 381 &data.new_port_descriptor); |
| 382 delegate_->ForwardMessage( |
| 383 destination_node_name, |
| 384 NewInternalMessage(destination_port_name, |
| 385 EventType::kMergePort, data)); |
| 386 } |
| 387 return OK; |
| 388 } |
| 389 |
361 int Node::LostConnectionToNode(const NodeName& node_name) { | 390 int Node::LostConnectionToNode(const NodeName& node_name) { |
362 // We can no longer send events to the given node. We also can't expect any | 391 // We can no longer send events to the given node. We also can't expect any |
363 // PortAccepted events. | 392 // PortAccepted events. |
364 | 393 |
365 DVLOG(1) << "Observing lost connection from node " << name_ | 394 DVLOG(1) << "Observing lost connection from node " << name_ |
366 << " to node " << node_name; | 395 << " to node " << node_name; |
367 | 396 |
368 std::vector<PortRef> ports_to_notify; | 397 std::vector<PortRef> ports_to_notify; |
369 | 398 |
370 { | 399 { |
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
502 { | 531 { |
503 // We must hold |ports_lock_| before grabbing the port lock because | 532 // We must hold |ports_lock_| before grabbing the port lock because |
504 // ForwardMessages_Locked requires it to be held. | 533 // ForwardMessages_Locked requires it to be held. |
505 base::AutoLock ports_lock(ports_lock_); | 534 base::AutoLock ports_lock(ports_lock_); |
506 base::AutoLock lock(port->lock); | 535 base::AutoLock lock(port->lock); |
507 | 536 |
508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ | 537 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
509 << " pointing to " | 538 << " pointing to " |
510 << port->peer_port_name << "@" << port->peer_node_name; | 539 << port->peer_port_name << "@" << port->peer_node_name; |
511 | 540 |
512 if (port->state != Port::kBuffering) | 541 return BeginProxying_Locked(port.get(), port_name); |
513 return OOPS(ERROR_PORT_STATE_UNEXPECTED); | |
514 | |
515 port->state = Port::kProxying; | |
516 | |
517 int rv = ForwardMessages_Locked(port.get(), port_name); | |
518 if (rv != OK) | |
519 return rv; | |
520 | |
521 // We may have observed closure before receiving PortAccepted. In that | |
522 // case, we can advance to removing the proxy without sending out an | |
523 // ObserveProxy message. We already know the last expected message, etc. | |
524 | |
525 if (port->remove_proxy_on_last_message) { | |
526 MaybeRemoveProxy_Locked(port.get(), port_name); | |
527 | |
528 // Make sure we propagate closure to our current peer. | |
529 ObserveClosureEventData data; | |
530 data.last_sequence_num = port->last_sequence_num_to_receive; | |
531 delegate_->ForwardMessage( | |
532 port->peer_node_name, | |
533 NewInternalMessage(port->peer_port_name, | |
534 EventType::kObserveClosure, data)); | |
535 } else { | |
536 InitiateProxyRemoval_Locked(port.get(), port_name); | |
537 } | |
538 } | 542 } |
539 return OK; | |
540 } | 543 } |
541 | 544 |
542 int Node::OnObserveProxy(const PortName& port_name, | 545 int Node::OnObserveProxy(const PortName& port_name, |
543 const ObserveProxyEventData& event) { | 546 const ObserveProxyEventData& event) { |
544 // The port may have already been closed locally, in which case the | 547 // The port may have already been closed locally, in which case the |
545 // ObserveClosure message will contain the last_sequence_num field. | 548 // ObserveClosure message will contain the last_sequence_num field. |
546 // We can then silently ignore this message. | 549 // We can then silently ignore this message. |
547 scoped_refptr<Port> port = GetPort(port_name); | 550 scoped_refptr<Port> port = GetPort(port_name); |
548 if (!port) { | 551 if (!port) { |
549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; | 552 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
714 NewInternalMessage(port->peer_port_name, | 717 NewInternalMessage(port->peer_port_name, |
715 EventType::kObserveClosure, forwarded_data)); | 718 EventType::kObserveClosure, forwarded_data)); |
716 } | 719 } |
717 if (notify_delegate) { | 720 if (notify_delegate) { |
718 PortRef port_ref(port_name, port); | 721 PortRef port_ref(port_name, port); |
719 delegate_->PortStatusChanged(port_ref); | 722 delegate_->PortStatusChanged(port_ref); |
720 } | 723 } |
721 return OK; | 724 return OK; |
722 } | 725 } |
723 | 726 |
| 727 int Node::OnMergePort(const PortName& port_name, |
| 728 const MergePortEventData& event) { |
| 729 scoped_refptr<Port> port = GetPort(port_name); |
| 730 if (!port) |
| 731 return ERROR_PORT_UNKNOWN; |
| 732 |
| 733 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" |
| 734 << port->state << ") merging with proxy " << event.new_port_name |
| 735 << "@" << name_ << " pointing to " |
| 736 << event.new_port_descriptor.peer_port_name << "@" |
| 737 << event.new_port_descriptor.peer_node_name << " referred by " |
| 738 << event.new_port_descriptor.referring_port_name << "@" |
| 739 << event.new_port_descriptor.referring_node_name; |
| 740 |
| 741 bool close_target_port = false; |
| 742 bool close_new_port = false; |
| 743 |
| 744 // Accept the new port. This is now the receiving end of the other port cycle |
| 745 // to be merged with ours. |
| 746 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); |
| 747 if (rv != OK) { |
| 748 close_target_port = true; |
| 749 } else { |
| 750 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn |
| 751 // needs to hold |ports_lock_|. We also acquire multiple port locks within. |
| 752 base::AutoLock ports_lock(ports_lock_); |
| 753 base::AutoLock lock(port->lock); |
| 754 |
| 755 if (port->state != Port::kReceiving) { |
| 756 close_new_port = true; |
| 757 } else { |
| 758 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); |
| 759 base::AutoLock new_port_lock(new_port->lock); |
| 760 DCHECK(new_port->state == Port::kReceiving); |
| 761 |
| 762 // Both ports are locked. Now all we have to do is swap their peer |
| 763 // information and set them up as proxies. |
| 764 |
| 765 std::swap(port->peer_node_name, new_port->peer_node_name); |
| 766 std::swap(port->peer_port_name, new_port->peer_port_name); |
| 767 std::swap(port->peer_closed, new_port->peer_closed); |
| 768 |
| 769 port->state = Port::kBuffering; |
| 770 if (port->peer_closed) |
| 771 port->remove_proxy_on_last_message = true; |
| 772 |
| 773 new_port->state = Port::kBuffering; |
| 774 if (new_port->peer_closed) |
| 775 new_port->remove_proxy_on_last_message = true; |
| 776 |
| 777 int rv1 = BeginProxying_Locked(port.get(), port_name); |
| 778 int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); |
| 779 |
| 780 if (rv1 == OK && rv2 == OK) |
| 781 return OK; |
| 782 |
| 783 // If either proxy failed to initialize (e.g. had undeliverable messages |
| 784 // or ended up in a bad state somehow), we keep the system in a consistent |
| 785 // state by undoing the peer swap and closing both merge ports. |
| 786 |
| 787 std::swap(port->peer_node_name, new_port->peer_node_name); |
| 788 std::swap(port->peer_port_name, new_port->peer_port_name); |
| 789 port->state = Port::kReceiving; |
| 790 new_port->state = Port::kReceiving; |
| 791 close_new_port = true; |
| 792 close_target_port = true; |
| 793 } |
| 794 } |
| 795 |
| 796 if (close_target_port) { |
| 797 PortRef target_port; |
| 798 rv = GetPort(port_name, &target_port); |
| 799 DCHECK(rv == OK); |
| 800 |
| 801 ClosePort(target_port); |
| 802 } |
| 803 |
| 804 if (close_new_port) { |
| 805 PortRef new_port; |
| 806 rv = GetPort(event.new_port_name, &new_port); |
| 807 DCHECK(rv == OK); |
| 808 |
| 809 ClosePort(new_port); |
| 810 } |
| 811 |
| 812 return ERROR_PORT_STATE_UNEXPECTED; |
| 813 } |
| 814 |
724 int Node::AddPortWithName(const PortName& port_name, | 815 int Node::AddPortWithName(const PortName& port_name, |
725 const scoped_refptr<Port>& port) { | 816 const scoped_refptr<Port>& port) { |
726 base::AutoLock lock(ports_lock_); | 817 base::AutoLock lock(ports_lock_); |
727 | 818 |
728 if (!ports_.insert(std::make_pair(port_name, port)).second) | 819 if (!ports_.insert(std::make_pair(port_name, port)).second) |
729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. | 820 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. |
730 | 821 |
731 DVLOG(2) << "Created port " << port_name << "@" << name_; | 822 DVLOG(2) << "Created port " << port_name << "@" << name_; |
732 return OK; | 823 return OK; |
733 } | 824 } |
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
901 << GetEventData<UserEventData>(*message)->sequence_num | 992 << GetEventData<UserEventData>(*message)->sequence_num |
902 << " [ports=" << ports_buf.str() << "]" | 993 << " [ports=" << ports_buf.str() << "]" |
903 << " from " << port_name << "@" << name_ | 994 << " from " << port_name << "@" << name_ |
904 << " to " << port->peer_port_name << "@" << port->peer_node_name; | 995 << " to " << port->peer_port_name << "@" << port->peer_node_name; |
905 #endif | 996 #endif |
906 | 997 |
907 GetMutableEventHeader(message)->port_name = port->peer_port_name; | 998 GetMutableEventHeader(message)->port_name = port->peer_port_name; |
908 return OK; | 999 return OK; |
909 } | 1000 } |
910 | 1001 |
| 1002 int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { |
| 1003 ports_lock_.AssertAcquired(); |
| 1004 port->lock.AssertAcquired(); |
| 1005 |
| 1006 if (port->state != Port::kBuffering) |
| 1007 return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
| 1008 |
| 1009 port->state = Port::kProxying; |
| 1010 |
| 1011 int rv = ForwardMessages_Locked(port, port_name); |
| 1012 if (rv != OK) |
| 1013 return rv; |
| 1014 |
| 1015 // We may have observed closure while buffering. In that case, we can advance |
| 1016 // to removing the proxy without sending out an ObserveProxy message. We |
| 1017 // already know the last expected message, etc. |
| 1018 |
| 1019 if (port->remove_proxy_on_last_message) { |
| 1020 MaybeRemoveProxy_Locked(port, port_name); |
| 1021 |
| 1022 // Make sure we propagate closure to our current peer. |
| 1023 ObserveClosureEventData data; |
| 1024 data.last_sequence_num = port->last_sequence_num_to_receive; |
| 1025 delegate_->ForwardMessage( |
| 1026 port->peer_node_name, |
| 1027 NewInternalMessage(port->peer_port_name, |
| 1028 EventType::kObserveClosure, data)); |
| 1029 } else { |
| 1030 InitiateProxyRemoval_Locked(port, port_name); |
| 1031 } |
| 1032 |
| 1033 return OK; |
| 1034 } |
| 1035 |
911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { | 1036 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
912 ports_lock_.AssertAcquired(); | 1037 ports_lock_.AssertAcquired(); |
913 port->lock.AssertAcquired(); | 1038 port->lock.AssertAcquired(); |
914 | 1039 |
915 for (;;) { | 1040 for (;;) { |
916 ScopedMessage message; | 1041 ScopedMessage message; |
917 port->message_queue.GetNextMessageIf(nullptr, &message); | 1042 port->message_queue.GetNextMessageIf(nullptr, &message); |
918 if (!message) | 1043 if (!message) |
919 break; | 1044 break; |
920 | 1045 |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
989 | 1114 |
990 if (num_data_bytes) | 1115 if (num_data_bytes) |
991 memcpy(header + 1, data, num_data_bytes); | 1116 memcpy(header + 1, data, num_data_bytes); |
992 | 1117 |
993 return message; | 1118 return message; |
994 } | 1119 } |
995 | 1120 |
996 } // namespace ports | 1121 } // namespace ports |
997 } // namespace edk | 1122 } // namespace edk |
998 } // namespace mojo | 1123 } // namespace mojo |
OLD | NEW |