Chromium Code Reviews| Index: mojo/edk/system/ports/node.cc |
| diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc |
| index 1bee00caf1ff3bab8524d578fbe353bd89764316..5e1e9ffe4b56466da259635ea2f1a6a7befc8860 100644 |
| --- a/mojo/edk/system/ports/node.cc |
| +++ b/mojo/edk/system/ports/node.cc |
| @@ -397,49 +397,7 @@ int Node::LostConnectionToNode(const NodeName& node_name) { |
| DVLOG(1) << "Observing lost connection from node " << name_ |
| << " to node " << node_name; |
| - std::vector<PortRef> ports_to_notify; |
| - |
| - { |
| - base::AutoLock ports_lock(ports_lock_); |
| - |
| - for (auto iter = ports_.begin(); iter != ports_.end(); ) { |
| - scoped_refptr<Port>& port = iter->second; |
| - |
| - bool remove_port = false; |
| - { |
| - base::AutoLock port_lock(port->lock); |
| - |
| - if (port->peer_node_name == node_name) { |
| - // We can no longer send messages to this port's peer. We assume we |
| - // will not receive any more messages from this port's peer as well. |
| - if (!port->peer_closed) { |
| - port->peer_closed = true; |
| - port->last_sequence_num_to_receive = |
| - port->message_queue.next_sequence_num() - 1; |
| - |
| - if (port->state == Port::kReceiving) |
| - ports_to_notify.push_back(PortRef(iter->first, port)); |
| - } |
| - |
| - // We do not expect to forward any further messages, and we do not |
| - // expect to receive a Port{Accepted,Rejected} event. |
| - if (port->state != Port::kReceiving) |
| - remove_port = true; |
| - } |
| - } |
| - |
| - if (remove_port) { |
| - DVLOG(2) << "Deleted port " << iter->first << "@" << name_; |
| - iter = ports_.erase(iter); |
| - } else { |
| - ++iter; |
| - } |
| - } |
| - } |
| - |
| - for (size_t i = 0; i < ports_to_notify.size(); ++i) |
| - delegate_->PortStatusChanged(ports_to_notify[i]); |
| - |
| + DestroyAllPortsWithPeer(node_name, kInvalidPortName); |
| return OK; |
| } |
| @@ -547,6 +505,20 @@ int Node::OnPortAccepted(const PortName& port_name) { |
| int Node::OnObserveProxy(const PortName& port_name, |
| const ObserveProxyEventData& event) { |
| + if (port_name == kInvalidPortName) { |
| + // An ObserveProxy with an invalid target port name is a broadcast used to |
| + // inform ports when their peer (which was itself a proxy) has become |
| + // defunct due to unexpected node disconnection. |
| + // |
| + // Receiving ports affected by this treat it as equivalent to peer closure. |
| + // Proxies affected by this can be removed and will in turn broadcast their |
| + // own death with a similar message. |
| + CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName); |
| + CHECK_EQ(event.proxy_to_port_name, kInvalidPortName); |
| + DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name); |
| + return OK; |
| + } |
| + |
| // The port may have already been closed locally, in which case the |
| // ObserveClosure message will contain the last_sequence_num field. |
| // We can then silently ignore this message. |
| @@ -1226,6 +1198,79 @@ void Node::MaybeRemoveProxy_Locked(Port* port, |
| } |
| } |
| +void Node::DestroyAllPortsWithPeer(const NodeName& node_name, |
| + const PortName& port_name) { |
| + // Wipes out all ports whose peer node matches |node_name| and whose peer port |
| + // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer |
| + // node is matched. |
| + |
| + std::vector<PortRef> ports_to_notify; |
| + std::vector<PortName> dead_proxies_to_broadcast; |
| + |
| + { |
| + base::AutoLock ports_lock(ports_lock_); |
| + |
| + for (auto iter = ports_.begin(); iter != ports_.end(); ) { |
| + scoped_refptr<Port>& port = iter->second; |
|
Anand Mistry (off Chromium)
2016/05/16 04:03:57
very subjective nit: Looks like using Port* is a f
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
|
| + |
| + bool remove_port = false; |
| + { |
| + base::AutoLock port_lock(port->lock); |
| + |
| + if (port->peer_node_name == node_name && |
| + (port_name == kInvalidPortName || |
| + port->peer_port_name == port_name)) { |
| + if (!port->peer_closed) { |
| + // Treat this as immediate peer closure. It's an exceptional |
| + // condition akin to a broken pipe, so we don't care about losing |
| + // messages. |
| + |
| + port->peer_closed = true; |
| + port->last_sequence_num_to_receive = |
| + port->message_queue.next_sequence_num() - 1; |
| + |
| + if (port->state == Port::kReceiving) |
| + ports_to_notify.push_back(PortRef(iter->first, port)); |
| + } |
| + |
| + // We don't expect to forward any further messages, and we don't |
| + // expect to receive a Port{Accepted,Rejected} event. Because we're |
| + // a proxy with no active peer, we cannot use the normal proxy removal |
| + // procedure of forward-propagating an ObserveProxy. Instead we |
| + // broadcast our own death so it can be back-propagated. This is |
| + // inefficient but rare. |
| + if (port->state != Port::kReceiving) { |
| + remove_port = true; |
| + dead_proxies_to_broadcast.push_back(iter->first); |
| + } |
| + } |
| + } |
| + |
| + if (remove_port) { |
| + DVLOG(2) << "Forcibly deleted port " << iter->first << "@" << name_; |
| + iter = ports_.erase(iter); |
|
Anand Mistry (off Chromium)
2016/05/16 04:03:57
um... according to http://en.cppreference.com/w/cp
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Yeah, that makes me uncomfortable. I changed it to
|
| + } else { |
| + ++iter; |
| + } |
| + } |
| + } |
| + |
| + // Wake up any receiving ports who have just observed simulated peer closure. |
| + for (size_t i = 0; i < ports_to_notify.size(); ++i) |
|
Anand Mistry (off Chromium)
2016/05/16 04:03:57
for (const auto& port : ports_to_notify)
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
|
| + delegate_->PortStatusChanged(ports_to_notify[i]); |
| + |
| + for (const auto& proxy_name : dead_proxies_to_broadcast) { |
| + // Broadcast an event signifying that this proxy is no longer functioning. |
| + ObserveProxyEventData event; |
| + event.proxy_node_name = name_; |
| + event.proxy_port_name = proxy_name; |
| + event.proxy_to_node_name = kInvalidNodeName; |
| + event.proxy_to_port_name = kInvalidPortName; |
| + delegate_->BroadcastMessage(NewInternalMessage( |
| + kInvalidPortName, EventType::kObserveProxy, event)); |
| + } |
| +} |
| + |
| ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, |
| const EventType& type, |
| const void* data, |