| Index: mojo/edk/system/ports/node.cc
|
| diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
|
| index f20acb0759db3cda826be2e1529330c7ac0a690a..630a1a2680c26be0dba4a710ba18b2e9ae095071 100644
|
| --- a/mojo/edk/system/ports/node.cc
|
| +++ b/mojo/edk/system/ports/node.cc
|
| @@ -410,49 +410,7 @@ int Node::LostConnectionToNode(const NodeName& node_name) {
|
| DVLOG(1) << "Observing lost connection from node " << name_
|
| << " to node " << node_name;
|
|
|
| - std::vector<PortRef> ports_to_notify;
|
| -
|
| - {
|
| - base::AutoLock ports_lock(ports_lock_);
|
| -
|
| - for (auto iter = ports_.begin(); iter != ports_.end(); ) {
|
| - scoped_refptr<Port>& port = iter->second;
|
| -
|
| - bool remove_port = false;
|
| - {
|
| - base::AutoLock port_lock(port->lock);
|
| -
|
| - if (port->peer_node_name == node_name) {
|
| - // We can no longer send messages to this port's peer. We assume we
|
| - // will not receive any more messages from this port's peer as well.
|
| - if (!port->peer_closed) {
|
| - port->peer_closed = true;
|
| - port->last_sequence_num_to_receive =
|
| - port->message_queue.next_sequence_num() - 1;
|
| -
|
| - if (port->state == Port::kReceiving)
|
| - ports_to_notify.push_back(PortRef(iter->first, port));
|
| - }
|
| -
|
| - // We do not expect to forward any further messages, and we do not
|
| - // expect to receive a Port{Accepted,Rejected} event.
|
| - if (port->state != Port::kReceiving)
|
| - remove_port = true;
|
| - }
|
| - }
|
| -
|
| - if (remove_port) {
|
| - DVLOG(2) << "Deleted port " << iter->first << "@" << name_;
|
| - iter = ports_.erase(iter);
|
| - } else {
|
| - ++iter;
|
| - }
|
| - }
|
| - }
|
| -
|
| - for (size_t i = 0; i < ports_to_notify.size(); ++i)
|
| - delegate_->PortStatusChanged(ports_to_notify[i]);
|
| -
|
| + DestroyAllPortsWithPeer(node_name, kInvalidPortName);
|
| return OK;
|
| }
|
|
|
| @@ -553,8 +511,19 @@ int Node::OnPortAccepted(const PortName& port_name) {
|
|
|
| int Node::OnObserveProxy(const PortName& port_name,
|
| const ObserveProxyEventData& event) {
|
| - if (port_name == kInvalidPortName)
|
| + if (port_name == kInvalidPortName) {
|
| + // An ObserveProxy with an invalid target port name is a broadcast used to
|
| + // inform ports when their peer (which was itself a proxy) has become
|
| + // defunct due to unexpected node disconnection.
|
| + //
|
| + // Receiving ports affected by this treat it as equivalent to peer closure.
|
| + // Proxies affected by this can be removed and will in turn broadcast their
|
| + // own death with a similar message.
|
| + CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
|
| + CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
|
| + DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
|
| return OK;
|
| + }
|
|
|
| // The port may have already been closed locally, in which case the
|
| // ObserveClosure message will contain the last_sequence_num field.
|
| @@ -1318,6 +1287,84 @@ void Node::TryRemoveProxy(PortRef port_ref) {
|
| delegate_->ForwardMessage(to_node, std::move(msg));
|
| }
|
|
|
| +void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
|
| + const PortName& port_name) {
|
| + // Wipes out all ports whose peer node matches |node_name| and whose peer port
|
| + // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
|
| + // node is matched.
|
| +
|
| + std::vector<PortRef> ports_to_notify;
|
| + std::vector<PortName> dead_proxies_to_broadcast;
|
| + std::deque<PortName> referenced_port_names;
|
| +
|
| + {
|
| + base::AutoLock ports_lock(ports_lock_);
|
| +
|
| + for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
|
| + Port* port = iter->second.get();
|
| + {
|
| + base::AutoLock port_lock(port->lock);
|
| +
|
| + if (port->peer_node_name == node_name &&
|
| + (port_name == kInvalidPortName ||
|
| + port->peer_port_name == port_name)) {
|
| + if (!port->peer_closed) {
|
| + // Treat this as immediate peer closure. It's an exceptional
|
| + // condition akin to a broken pipe, so we don't care about losing
|
| + // messages.
|
| +
|
| + port->peer_closed = true;
|
| + port->last_sequence_num_to_receive =
|
| + port->message_queue.next_sequence_num() - 1;
|
| +
|
| + if (port->state == Port::kReceiving)
|
| + ports_to_notify.push_back(PortRef(iter->first, port));
|
| + }
|
| +
|
| + // We don't expect to forward any further messages, and we don't
|
| + // expect to receive a Port{Accepted,Rejected} event. Because we're
|
| + // a proxy with no active peer, we cannot use the normal proxy removal
|
| + // procedure of forward-propagating an ObserveProxy. Instead we
|
| + // broadcast our own death so it can be back-propagated. This is
|
| + // inefficient but rare.
|
| + if (port->state != Port::kReceiving) {
|
| + dead_proxies_to_broadcast.push_back(iter->first);
|
| + iter->second->message_queue.GetReferencedPorts(
|
| + &referenced_port_names);
|
| + }
|
| + }
|
| + }
|
| + }
|
| +
|
| + for (const auto& proxy_name : dead_proxies_to_broadcast) {
|
| + ports_.erase(proxy_name);
|
| + DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
|
| + }
|
| + }
|
| +
|
| + // Wake up any receiving ports who have just observed simulated peer closure.
|
| + for (const auto& port : ports_to_notify)
|
| + delegate_->PortStatusChanged(port);
|
| +
|
| + for (const auto& proxy_name : dead_proxies_to_broadcast) {
|
| + // Broadcast an event signifying that this proxy is no longer functioning.
|
| + ObserveProxyEventData event;
|
| + event.proxy_node_name = name_;
|
| + event.proxy_port_name = proxy_name;
|
| + event.proxy_to_node_name = kInvalidNodeName;
|
| + event.proxy_to_port_name = kInvalidPortName;
|
| + delegate_->BroadcastMessage(NewInternalMessage(
|
| + kInvalidPortName, EventType::kObserveProxy, event));
|
| + }
|
| +
|
| + // Close any ports referenced by the closed proxies.
|
| + for (const auto& name : referenced_port_names) {
|
| + PortRef ref;
|
| + if (GetPort(name, &ref) == OK)
|
| + ClosePort(ref);
|
| + }
|
| +}
|
| +
|
| ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
|
| const EventType& type,
|
| const void* data,
|
|
|