Index: mojo/edk/system/ports/node.cc |
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc |
index 1bee00caf1ff3bab8524d578fbe353bd89764316..5e1e9ffe4b56466da259635ea2f1a6a7befc8860 100644 |
--- a/mojo/edk/system/ports/node.cc |
+++ b/mojo/edk/system/ports/node.cc |
@@ -397,49 +397,7 @@ int Node::LostConnectionToNode(const NodeName& node_name) { |
DVLOG(1) << "Observing lost connection from node " << name_ |
<< " to node " << node_name; |
- std::vector<PortRef> ports_to_notify; |
- |
- { |
- base::AutoLock ports_lock(ports_lock_); |
- |
- for (auto iter = ports_.begin(); iter != ports_.end(); ) { |
- scoped_refptr<Port>& port = iter->second; |
- |
- bool remove_port = false; |
- { |
- base::AutoLock port_lock(port->lock); |
- |
- if (port->peer_node_name == node_name) { |
- // We can no longer send messages to this port's peer. We assume we |
- // will not receive any more messages from this port's peer as well. |
- if (!port->peer_closed) { |
- port->peer_closed = true; |
- port->last_sequence_num_to_receive = |
- port->message_queue.next_sequence_num() - 1; |
- |
- if (port->state == Port::kReceiving) |
- ports_to_notify.push_back(PortRef(iter->first, port)); |
- } |
- |
- // We do not expect to forward any further messages, and we do not |
- // expect to receive a Port{Accepted,Rejected} event. |
- if (port->state != Port::kReceiving) |
- remove_port = true; |
- } |
- } |
- |
- if (remove_port) { |
- DVLOG(2) << "Deleted port " << iter->first << "@" << name_; |
- iter = ports_.erase(iter); |
- } else { |
- ++iter; |
- } |
- } |
- } |
- |
- for (size_t i = 0; i < ports_to_notify.size(); ++i) |
- delegate_->PortStatusChanged(ports_to_notify[i]); |
- |
+ DestroyAllPortsWithPeer(node_name, kInvalidPortName); |
return OK; |
} |
@@ -547,6 +505,20 @@ int Node::OnPortAccepted(const PortName& port_name) { |
int Node::OnObserveProxy(const PortName& port_name, |
const ObserveProxyEventData& event) { |
+ if (port_name == kInvalidPortName) { |
+ // An ObserveProxy with an invalid target port name is a broadcast used to |
+ // inform ports when their peer (which was itself a proxy) has become |
+ // defunct due to unexpected node disconnection. |
+ // |
+ // Receiving ports affected by this treat it as equivalent to peer closure. |
+ // Proxies affected by this can be removed and will in turn broadcast their |
+ // own death with a similar message. |
+ CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName); |
+ CHECK_EQ(event.proxy_to_port_name, kInvalidPortName); |
+ DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name); |
+ return OK; |
+ } |
+ |
// The port may have already been closed locally, in which case the |
// ObserveClosure message will contain the last_sequence_num field. |
// We can then silently ignore this message. |
@@ -1226,6 +1198,79 @@ void Node::MaybeRemoveProxy_Locked(Port* port, |
} |
} |
+void Node::DestroyAllPortsWithPeer(const NodeName& node_name, |
+ const PortName& port_name) { |
+ // Wipes out all ports whose peer node matches |node_name| and whose peer port |
+ // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer |
+ // node is matched. |
+ |
+ std::vector<PortRef> ports_to_notify; |
+ std::vector<PortName> dead_proxies_to_broadcast; |
+ |
+ { |
+ base::AutoLock ports_lock(ports_lock_); |
+ |
+ for (auto iter = ports_.begin(); iter != ports_.end(); ) { |
+ scoped_refptr<Port>& port = iter->second; |
Anand Mistry (off Chromium)
2016/05/16 04:03:57
very subjective nit: Looks like using Port* is a f
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
|
+ |
+ bool remove_port = false; |
+ { |
+ base::AutoLock port_lock(port->lock); |
+ |
+ if (port->peer_node_name == node_name && |
+ (port_name == kInvalidPortName || |
+ port->peer_port_name == port_name)) { |
+ if (!port->peer_closed) { |
+ // Treat this as immediate peer closure. It's an exceptional |
+ // condition akin to a broken pipe, so we don't care about losing |
+ // messages. |
+ |
+ port->peer_closed = true; |
+ port->last_sequence_num_to_receive = |
+ port->message_queue.next_sequence_num() - 1; |
+ |
+ if (port->state == Port::kReceiving) |
+ ports_to_notify.push_back(PortRef(iter->first, port)); |
+ } |
+ |
+ // We don't expect to forward any further messages, and we don't |
+ // expect to receive a Port{Accepted,Rejected} event. Because we're |
+ // a proxy with no active peer, we cannot use the normal proxy removal |
+ // procedure of forward-propagating an ObserveProxy. Instead we |
+ // broadcast our own death so it can be back-propagated. This is |
+ // inefficient but rare. |
+ if (port->state != Port::kReceiving) { |
+ remove_port = true; |
+ dead_proxies_to_broadcast.push_back(iter->first); |
+ } |
+ } |
+ } |
+ |
+ if (remove_port) { |
+ DVLOG(2) << "Forcibly deleted port " << iter->first << "@" << name_; |
+ iter = ports_.erase(iter); |
Anand Mistry (off Chromium)
2016/05/16 04:03:57
um... according to http://en.cppreference.com/w/cp
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Yeah, that makes me uncomfortable. I changed it to
|
+ } else { |
+ ++iter; |
+ } |
+ } |
+ } |
+ |
+ // Wake up any receiving ports who have just observed simulated peer closure. |
+ for (size_t i = 0; i < ports_to_notify.size(); ++i) |
Anand Mistry (off Chromium)
2016/05/16 04:03:57
for (const auto& port : ports_to_notify)
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
|
+ delegate_->PortStatusChanged(ports_to_notify[i]); |
+ |
+ for (const auto& proxy_name : dead_proxies_to_broadcast) { |
+ // Broadcast an event signifying that this proxy is no longer functioning. |
+ ObserveProxyEventData event; |
+ event.proxy_node_name = name_; |
+ event.proxy_port_name = proxy_name; |
+ event.proxy_to_node_name = kInvalidNodeName; |
+ event.proxy_to_port_name = kInvalidPortName; |
+ delegate_->BroadcastMessage(NewInternalMessage( |
+ kInvalidPortName, EventType::kObserveProxy, event)); |
+ } |
+} |
+ |
ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, |
const EventType& type, |
const void* data, |