Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(785)

Unified Diff: mojo/edk/system/ports/node.cc

Issue 1975073002: [mojo-edk] Broadcast surprise port disruptions (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@reenable-clean-shutdown
Patch Set: . Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/edk/system/ports/node_delegate.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/ports/node.cc
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
index f20acb0759db3cda826be2e1529330c7ac0a690a..630a1a2680c26be0dba4a710ba18b2e9ae095071 100644
--- a/mojo/edk/system/ports/node.cc
+++ b/mojo/edk/system/ports/node.cc
@@ -410,49 +410,7 @@ int Node::LostConnectionToNode(const NodeName& node_name) {
DVLOG(1) << "Observing lost connection from node " << name_
<< " to node " << node_name;
- std::vector<PortRef> ports_to_notify;
-
- {
- base::AutoLock ports_lock(ports_lock_);
-
- for (auto iter = ports_.begin(); iter != ports_.end(); ) {
- scoped_refptr<Port>& port = iter->second;
-
- bool remove_port = false;
- {
- base::AutoLock port_lock(port->lock);
-
- if (port->peer_node_name == node_name) {
- // We can no longer send messages to this port's peer. We assume we
- // will not receive any more messages from this port's peer as well.
- if (!port->peer_closed) {
- port->peer_closed = true;
- port->last_sequence_num_to_receive =
- port->message_queue.next_sequence_num() - 1;
-
- if (port->state == Port::kReceiving)
- ports_to_notify.push_back(PortRef(iter->first, port));
- }
-
- // We do not expect to forward any further messages, and we do not
- // expect to receive a Port{Accepted,Rejected} event.
- if (port->state != Port::kReceiving)
- remove_port = true;
- }
- }
-
- if (remove_port) {
- DVLOG(2) << "Deleted port " << iter->first << "@" << name_;
- iter = ports_.erase(iter);
- } else {
- ++iter;
- }
- }
- }
-
- for (size_t i = 0; i < ports_to_notify.size(); ++i)
- delegate_->PortStatusChanged(ports_to_notify[i]);
-
+ DestroyAllPortsWithPeer(node_name, kInvalidPortName);
return OK;
}
@@ -553,8 +511,19 @@ int Node::OnPortAccepted(const PortName& port_name) {
int Node::OnObserveProxy(const PortName& port_name,
const ObserveProxyEventData& event) {
- if (port_name == kInvalidPortName)
+ if (port_name == kInvalidPortName) {
+ // An ObserveProxy with an invalid target port name is a broadcast used to
+ // inform ports when their peer (which was itself a proxy) has become
+ // defunct due to unexpected node disconnection.
+ //
+ // Receiving ports affected by this treat it as equivalent to peer closure.
+ // Proxies affected by this can be removed and will in turn broadcast their
+ // own death with a similar message.
+ CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
+ CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
+ DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
return OK;
+ }
// The port may have already been closed locally, in which case the
// ObserveClosure message will contain the last_sequence_num field.
@@ -1318,6 +1287,84 @@ void Node::TryRemoveProxy(PortRef port_ref) {
delegate_->ForwardMessage(to_node, std::move(msg));
}
+void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
+ const PortName& port_name) {
+ // Wipes out all ports whose peer node matches |node_name| and whose peer port
+ // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
+ // node is matched.
+
+ std::vector<PortRef> ports_to_notify;
+ std::vector<PortName> dead_proxies_to_broadcast;
+ std::deque<PortName> referenced_port_names;
+
+ {
+ base::AutoLock ports_lock(ports_lock_);
+
+ for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
+ Port* port = iter->second.get();
+ {
+ base::AutoLock port_lock(port->lock);
+
+ if (port->peer_node_name == node_name &&
+ (port_name == kInvalidPortName ||
+ port->peer_port_name == port_name)) {
+ if (!port->peer_closed) {
+ // Treat this as immediate peer closure. It's an exceptional
+ // condition akin to a broken pipe, so we don't care about losing
+ // messages.
+
+ port->peer_closed = true;
+ port->last_sequence_num_to_receive =
+ port->message_queue.next_sequence_num() - 1;
+
+ if (port->state == Port::kReceiving)
+ ports_to_notify.push_back(PortRef(iter->first, port));
+ }
+
+ // We don't expect to forward any further messages, and we don't
+ // expect to receive a Port{Accepted,Rejected} event. Because we're
+ // a proxy with no active peer, we cannot use the normal proxy removal
+ // procedure of forward-propagating an ObserveProxy. Instead we
+ // broadcast our own death so it can be back-propagated. This is
+ // inefficient but rare.
+ if (port->state != Port::kReceiving) {
+ dead_proxies_to_broadcast.push_back(iter->first);
+ iter->second->message_queue.GetReferencedPorts(
+ &referenced_port_names);
+ }
+ }
+ }
+ }
+
+ for (const auto& proxy_name : dead_proxies_to_broadcast) {
+ ports_.erase(proxy_name);
+ DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
+ }
+ }
+
+ // Wake up any receiving ports who have just observed simulated peer closure.
+ for (const auto& port : ports_to_notify)
+ delegate_->PortStatusChanged(port);
+
+ for (const auto& proxy_name : dead_proxies_to_broadcast) {
+ // Broadcast an event signifying that this proxy is no longer functioning.
+ ObserveProxyEventData event;
+ event.proxy_node_name = name_;
+ event.proxy_port_name = proxy_name;
+ event.proxy_to_node_name = kInvalidNodeName;
+ event.proxy_to_port_name = kInvalidPortName;
+ delegate_->BroadcastMessage(NewInternalMessage(
+ kInvalidPortName, EventType::kObserveProxy, event));
+ }
+
+ // Close any ports referenced by the closed proxies.
+ for (const auto& name : referenced_port_names) {
+ PortRef ref;
+ if (GetPort(name, &ref) == OK)
+ ClosePort(ref);
+ }
+}
+
ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
const EventType& type,
const void* data,
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/edk/system/ports/node_delegate.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698