OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
(...skipping 336 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
347 header->port_name, | 347 header->port_name, |
348 *GetEventData<ObserveProxyEventData>(*message)); | 348 *GetEventData<ObserveProxyEventData>(*message)); |
349 case EventType::kObserveProxyAck: | 349 case EventType::kObserveProxyAck: |
350 return OnObserveProxyAck( | 350 return OnObserveProxyAck( |
351 header->port_name, | 351 header->port_name, |
352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); | 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); |
353 case EventType::kObserveClosure: | 353 case EventType::kObserveClosure: |
354 return OnObserveClosure( | 354 return OnObserveClosure( |
355 header->port_name, | 355 header->port_name, |
356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); | 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
357 case EventType::kMergePort: | |
358 return OnMergePort(header->port_name, | |
359 *GetEventData<MergePortEventData>(*message)); | |
357 } | 360 } |
358 return OOPS(ERROR_NOT_IMPLEMENTED); | 361 return OOPS(ERROR_NOT_IMPLEMENTED); |
359 } | 362 } |
360 | 363 |
364 int Node::MergePorts(const PortRef& port_ref, | |
365 const NodeName& destination_node_name, | |
366 const PortName& destination_port_name) { | |
367 Port* port = port_ref.port(); | |
368 { | |
369 // |ports_lock_| must be held for WillSendPort_Locked below. | |
370 base::AutoLock ports_lock(ports_lock_); | |
371 base::AutoLock lock(port->lock); | |
372 | |
373 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ | |
374 << " to " << destination_port_name << "@" << destination_node_name; | |
375 | |
376 // Send the port-to-merge over to the destination node so it can be merged | |
377 // into the port cycle atomically there. | |
378 MergePortEventData data; | |
379 data.new_port_name = port_ref.name(); | |
380 WillSendPort_Locked(port, destination_node_name, &data.new_port_name, | |
381 &data.new_port_descriptor); | |
382 delegate_->ForwardMessage( | |
383 destination_node_name, | |
384 NewInternalMessage(destination_port_name, | |
385 EventType::kMergePort, data)); | |
386 } | |
387 return OK; | |
388 } | |
389 | |
361 int Node::LostConnectionToNode(const NodeName& node_name) { | 390 int Node::LostConnectionToNode(const NodeName& node_name) { |
362 // We can no longer send events to the given node. We also can't expect any | 391 // We can no longer send events to the given node. We also can't expect any |
363 // PortAccepted events. | 392 // PortAccepted events. |
364 | 393 |
365 DVLOG(1) << "Observing lost connection from node " << name_ | 394 DVLOG(1) << "Observing lost connection from node " << name_ |
366 << " to node " << node_name; | 395 << " to node " << node_name; |
367 | 396 |
368 std::vector<PortRef> ports_to_notify; | 397 std::vector<PortRef> ports_to_notify; |
369 | 398 |
370 { | 399 { |
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
502 { | 531 { |
503 // We must hold |ports_lock_| before grabbing the port lock because | 532 // We must hold |ports_lock_| before grabbing the port lock because |
504 // ForwardMessages_Locked requires it to be held. | 533 // ForwardMessages_Locked requires it to be held. |
505 base::AutoLock ports_lock(ports_lock_); | 534 base::AutoLock ports_lock(ports_lock_); |
506 base::AutoLock lock(port->lock); | 535 base::AutoLock lock(port->lock); |
507 | 536 |
508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ | 537 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
509 << " pointing to " | 538 << " pointing to " |
510 << port->peer_port_name << "@" << port->peer_node_name; | 539 << port->peer_port_name << "@" << port->peer_node_name; |
511 | 540 |
512 if (port->state != Port::kBuffering) | 541 return BeginProxying_Locked(port.get(), port_name); |
513 return OOPS(ERROR_PORT_STATE_UNEXPECTED); | |
514 | |
515 port->state = Port::kProxying; | |
516 | |
517 int rv = ForwardMessages_Locked(port.get(), port_name); | |
518 if (rv != OK) | |
519 return rv; | |
520 | |
521 // We may have observed closure before receiving PortAccepted. In that | |
522 // case, we can advance to removing the proxy without sending out an | |
523 // ObserveProxy message. We already know the last expected message, etc. | |
524 | |
525 if (port->remove_proxy_on_last_message) { | |
526 MaybeRemoveProxy_Locked(port.get(), port_name); | |
527 | |
528 // Make sure we propagate closure to our current peer. | |
529 ObserveClosureEventData data; | |
530 data.last_sequence_num = port->last_sequence_num_to_receive; | |
531 delegate_->ForwardMessage( | |
532 port->peer_node_name, | |
533 NewInternalMessage(port->peer_port_name, | |
534 EventType::kObserveClosure, data)); | |
535 } else { | |
536 InitiateProxyRemoval_Locked(port.get(), port_name); | |
537 } | |
538 } | 542 } |
539 return OK; | |
540 } | 543 } |
541 | 544 |
542 int Node::OnObserveProxy(const PortName& port_name, | 545 int Node::OnObserveProxy(const PortName& port_name, |
543 const ObserveProxyEventData& event) { | 546 const ObserveProxyEventData& event) { |
544 // The port may have already been closed locally, in which case the | 547 // The port may have already been closed locally, in which case the |
545 // ObserveClosure message will contain the last_sequence_num field. | 548 // ObserveClosure message will contain the last_sequence_num field. |
546 // We can then silently ignore this message. | 549 // We can then silently ignore this message. |
547 scoped_refptr<Port> port = GetPort(port_name); | 550 scoped_refptr<Port> port = GetPort(port_name); |
548 if (!port) { | 551 if (!port) { |
549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; | 552 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
714 NewInternalMessage(port->peer_port_name, | 717 NewInternalMessage(port->peer_port_name, |
715 EventType::kObserveClosure, forwarded_data)); | 718 EventType::kObserveClosure, forwarded_data)); |
716 } | 719 } |
717 if (notify_delegate) { | 720 if (notify_delegate) { |
718 PortRef port_ref(port_name, port); | 721 PortRef port_ref(port_name, port); |
719 delegate_->PortStatusChanged(port_ref); | 722 delegate_->PortStatusChanged(port_ref); |
720 } | 723 } |
721 return OK; | 724 return OK; |
722 } | 725 } |
723 | 726 |
727 int Node::OnMergePort(const PortName& port_name, | |
728 const MergePortEventData& event) { | |
729 scoped_refptr<Port> port = GetPort(port_name); | |
730 if (!port) | |
731 return ERROR_PORT_UNKNOWN; | |
732 | |
733 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" | |
734 << port->state << ") merging with proxy " << event.new_port_name | |
735 << "@" << name_ << " pointing to " | |
736 << event.new_port_descriptor.peer_port_name << "@" | |
737 << event.new_port_descriptor.peer_node_name << " referred by " | |
738 << event.new_port_descriptor.referring_port_name << "@" | |
739 << event.new_port_descriptor.referring_node_name; | |
740 | |
741 // Accept the new port. This is now the receiving end of the other port cycle | |
742 // to be merged with ours. | |
743 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); | |
744 if (rv != OK) | |
745 return rv; | |
746 | |
747 { | |
748 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn | |
749 // needs to hold |ports_lock_|. We also acquire multiple port locks within. | |
750 base::AutoLock ports_lock(ports_lock_); | |
751 | |
752 base::AutoLock lock(port->lock); | |
753 | |
754 if (port->state != Port::kReceiving) | |
755 return ERROR_PORT_STATE_UNEXPECTED; | |
756 | |
757 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); | |
darin (slow to review)
2016/02/05 17:46:45
should we make some assertions about the state of
Ken Rockot(use gerrit already)
2016/02/05 19:43:05
done
| |
758 base::AutoLock new_port_lock(new_port->lock); | |
759 | |
760 // Both ports are locked. Now all we have to do is swap their peer | |
761 // information and set them up as proxies. | |
762 | |
763 std::swap(port->peer_node_name, new_port->peer_node_name); | |
764 std::swap(port->peer_port_name, new_port->peer_port_name); | |
765 | |
766 port->state = Port::kBuffering; | |
767 if (port->peer_closed) | |
768 port->remove_proxy_on_last_message = true; | |
769 | |
770 new_port->state = Port::kBuffering; | |
771 if (new_port->peer_closed) | |
772 new_port->remove_proxy_on_last_message = true; | |
773 | |
774 rv = BeginProxying_Locked(port.get(), port_name); | |
775 if (rv != OK) | |
776 return rv; | |
darin (slow to review)
2016/02/05 17:46:45
Are we in a bad state potentially if we early retu
Ken Rockot(use gerrit already)
2016/02/05 19:43:05
Yeah, I kinda glossed over this before. Changed it
| |
777 | |
778 return BeginProxying_Locked(new_port.get(), event.new_port_name); | |
779 } | |
780 } | |
781 | |
724 int Node::AddPortWithName(const PortName& port_name, | 782 int Node::AddPortWithName(const PortName& port_name, |
725 const scoped_refptr<Port>& port) { | 783 const scoped_refptr<Port>& port) { |
726 base::AutoLock lock(ports_lock_); | 784 base::AutoLock lock(ports_lock_); |
727 | 785 |
728 if (!ports_.insert(std::make_pair(port_name, port)).second) | 786 if (!ports_.insert(std::make_pair(port_name, port)).second) |
729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. | 787 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. |
730 | 788 |
731 DVLOG(2) << "Created port " << port_name << "@" << name_; | 789 DVLOG(2) << "Created port " << port_name << "@" << name_; |
732 return OK; | 790 return OK; |
733 } | 791 } |
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
901 << GetEventData<UserEventData>(*message)->sequence_num | 959 << GetEventData<UserEventData>(*message)->sequence_num |
902 << " [ports=" << ports_buf.str() << "]" | 960 << " [ports=" << ports_buf.str() << "]" |
903 << " from " << port_name << "@" << name_ | 961 << " from " << port_name << "@" << name_ |
904 << " to " << port->peer_port_name << "@" << port->peer_node_name; | 962 << " to " << port->peer_port_name << "@" << port->peer_node_name; |
905 #endif | 963 #endif |
906 | 964 |
907 GetMutableEventHeader(message)->port_name = port->peer_port_name; | 965 GetMutableEventHeader(message)->port_name = port->peer_port_name; |
908 return OK; | 966 return OK; |
909 } | 967 } |
910 | 968 |
969 int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { | |
970 ports_lock_.AssertAcquired(); | |
971 port->lock.AssertAcquired(); | |
972 | |
973 if (port->state != Port::kBuffering) | |
974 return OOPS(ERROR_PORT_STATE_UNEXPECTED); | |
975 | |
976 port->state = Port::kProxying; | |
977 | |
978 int rv = ForwardMessages_Locked(port, port_name); | |
979 if (rv != OK) | |
980 return rv; | |
981 | |
982 // We may have observed closure while buffering. In that case, we can advance | |
983 // to removing the proxy without sending out an ObserveProxy message. We | |
984 // already know the last expected message, etc. | |
985 | |
986 if (port->remove_proxy_on_last_message) { | |
987 MaybeRemoveProxy_Locked(port, port_name); | |
988 | |
989 // Make sure we propagate closure to our current peer. | |
990 ObserveClosureEventData data; | |
991 data.last_sequence_num = port->last_sequence_num_to_receive; | |
992 delegate_->ForwardMessage( | |
993 port->peer_node_name, | |
994 NewInternalMessage(port->peer_port_name, | |
995 EventType::kObserveClosure, data)); | |
996 } else { | |
997 InitiateProxyRemoval_Locked(port, port_name); | |
998 } | |
999 | |
1000 return OK; | |
1001 } | |
1002 | |
911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { | 1003 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
912 ports_lock_.AssertAcquired(); | 1004 ports_lock_.AssertAcquired(); |
913 port->lock.AssertAcquired(); | 1005 port->lock.AssertAcquired(); |
914 | 1006 |
915 for (;;) { | 1007 for (;;) { |
916 ScopedMessage message; | 1008 ScopedMessage message; |
917 port->message_queue.GetNextMessageIf(nullptr, &message); | 1009 port->message_queue.GetNextMessageIf(nullptr, &message); |
918 if (!message) | 1010 if (!message) |
919 break; | 1011 break; |
920 | 1012 |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
989 | 1081 |
990 if (num_data_bytes) | 1082 if (num_data_bytes) |
991 memcpy(header + 1, data, num_data_bytes); | 1083 memcpy(header + 1, data, num_data_bytes); |
992 | 1084 |
993 return message; | 1085 return message; |
994 } | 1086 } |
995 | 1087 |
996 } // namespace ports | 1088 } // namespace ports |
997 } // namespace edk | 1089 } // namespace edk |
998 } // namespace mojo | 1090 } // namespace mojo |
OLD | NEW |