Index: mojo/edk/system/ports/node.cc |
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc |
index f20acb0759db3cda826be2e1529330c7ac0a690a..630a1a2680c26be0dba4a710ba18b2e9ae095071 100644 |
--- a/mojo/edk/system/ports/node.cc |
+++ b/mojo/edk/system/ports/node.cc |
@@ -410,49 +410,7 @@ int Node::LostConnectionToNode(const NodeName& node_name) { |
DVLOG(1) << "Observing lost connection from node " << name_ |
<< " to node " << node_name; |
- std::vector<PortRef> ports_to_notify; |
- |
- { |
- base::AutoLock ports_lock(ports_lock_); |
- |
- for (auto iter = ports_.begin(); iter != ports_.end(); ) { |
- scoped_refptr<Port>& port = iter->second; |
- |
- bool remove_port = false; |
- { |
- base::AutoLock port_lock(port->lock); |
- |
- if (port->peer_node_name == node_name) { |
- // We can no longer send messages to this port's peer. We assume we |
- // will not receive any more messages from this port's peer as well. |
- if (!port->peer_closed) { |
- port->peer_closed = true; |
- port->last_sequence_num_to_receive = |
- port->message_queue.next_sequence_num() - 1; |
- |
- if (port->state == Port::kReceiving) |
- ports_to_notify.push_back(PortRef(iter->first, port)); |
- } |
- |
- // We do not expect to forward any further messages, and we do not |
- // expect to receive a Port{Accepted,Rejected} event. |
- if (port->state != Port::kReceiving) |
- remove_port = true; |
- } |
- } |
- |
- if (remove_port) { |
- DVLOG(2) << "Deleted port " << iter->first << "@" << name_; |
- iter = ports_.erase(iter); |
- } else { |
- ++iter; |
- } |
- } |
- } |
- |
- for (size_t i = 0; i < ports_to_notify.size(); ++i) |
- delegate_->PortStatusChanged(ports_to_notify[i]); |
- |
+ DestroyAllPortsWithPeer(node_name, kInvalidPortName); |
return OK; |
} |
@@ -553,8 +511,19 @@ int Node::OnPortAccepted(const PortName& port_name) { |
int Node::OnObserveProxy(const PortName& port_name, |
const ObserveProxyEventData& event) { |
- if (port_name == kInvalidPortName) |
+ if (port_name == kInvalidPortName) { |
+ // An ObserveProxy with an invalid target port name is a broadcast used to |
+ // inform ports when their peer (which was itself a proxy) has become |
+ // defunct due to unexpected node disconnection. |
+ // |
+ // Receiving ports affected by this treat it as equivalent to peer closure. |
+ // Proxies affected by this can be removed and will in turn broadcast their |
+ // own death with a similar message. |
+ CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName); |
+ CHECK_EQ(event.proxy_to_port_name, kInvalidPortName); |
+ DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name); |
return OK; |
+ } |
// The port may have already been closed locally, in which case the |
// ObserveClosure message will contain the last_sequence_num field. |
@@ -1318,6 +1287,84 @@ void Node::TryRemoveProxy(PortRef port_ref) { |
delegate_->ForwardMessage(to_node, std::move(msg)); |
} |
+void Node::DestroyAllPortsWithPeer(const NodeName& node_name, |
+ const PortName& port_name) { |
+ // Wipes out all ports whose peer node matches |node_name| and whose peer port |
+ // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer |
+ // node is matched. |
+ |
+ std::vector<PortRef> ports_to_notify; |
+ std::vector<PortName> dead_proxies_to_broadcast; |
+ std::deque<PortName> referenced_port_names; |
+ |
+ { |
+ base::AutoLock ports_lock(ports_lock_); |
+ |
+ for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) { |
+ Port* port = iter->second.get(); |
+ { |
+ base::AutoLock port_lock(port->lock); |
+ |
+ if (port->peer_node_name == node_name && |
+ (port_name == kInvalidPortName || |
+ port->peer_port_name == port_name)) { |
+ if (!port->peer_closed) { |
+ // Treat this as immediate peer closure. It's an exceptional |
+ // condition akin to a broken pipe, so we don't care about losing |
+ // messages. |
+ |
+ port->peer_closed = true; |
+ port->last_sequence_num_to_receive = |
+ port->message_queue.next_sequence_num() - 1; |
+ |
+ if (port->state == Port::kReceiving) |
+ ports_to_notify.push_back(PortRef(iter->first, port)); |
+ } |
+ |
+ // We don't expect to forward any further messages, and we don't |
+ // expect to receive a Port{Accepted,Rejected} event. Because we're |
+ // a proxy with no active peer, we cannot use the normal proxy removal |
+ // procedure of forward-propagating an ObserveProxy. Instead we |
+ // broadcast our own death so it can be back-propagated. This is |
+ // inefficient but rare. |
+ if (port->state != Port::kReceiving) { |
+ dead_proxies_to_broadcast.push_back(iter->first); |
+ iter->second->message_queue.GetReferencedPorts( |
+ &referenced_port_names); |
+ } |
+ } |
+ } |
+ } |
+ |
+ for (const auto& proxy_name : dead_proxies_to_broadcast) { |
+ ports_.erase(proxy_name); |
+ DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_; |
+ } |
+ } |
+ |
+ // Wake up any receiving ports who have just observed simulated peer closure. |
+ for (const auto& port : ports_to_notify) |
+ delegate_->PortStatusChanged(port); |
+ |
+ for (const auto& proxy_name : dead_proxies_to_broadcast) { |
+ // Broadcast an event signifying that this proxy is no longer functioning. |
+ ObserveProxyEventData event; |
+ event.proxy_node_name = name_; |
+ event.proxy_port_name = proxy_name; |
+ event.proxy_to_node_name = kInvalidNodeName; |
+ event.proxy_to_port_name = kInvalidPortName; |
+ delegate_->BroadcastMessage(NewInternalMessage( |
+ kInvalidPortName, EventType::kObserveProxy, event)); |
+ } |
+ |
+ // Close any ports referenced by the closed proxies. |
+ for (const auto& name : referenced_port_names) { |
+ PortRef ref; |
+ if (GetPort(name, &ref) == OK) |
+ ClosePort(ref); |
+ } |
+} |
+ |
ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, |
const EventType& type, |
const void* data, |