Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(388)

Side by Side Diff: mojo/edk/system/ports/node.cc

Issue 1975073002: [mojo-edk] Broadcast surprise port disruptions (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@reenable-clean-shutdown
Patch Set: rebase Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/ports/node.h" 5 #include "mojo/edk/system/ports/node.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
(...skipping 379 matching lines...) Expand 10 before | Expand all | Expand 10 after
390 return rv; 390 return rv;
391 } 391 }
392 392
393 int Node::LostConnectionToNode(const NodeName& node_name) { 393 int Node::LostConnectionToNode(const NodeName& node_name) {
394 // We can no longer send events to the given node. We also can't expect any 394 // We can no longer send events to the given node. We also can't expect any
395 // PortAccepted events. 395 // PortAccepted events.
396 396
397 DVLOG(1) << "Observing lost connection from node " << name_ 397 DVLOG(1) << "Observing lost connection from node " << name_
398 << " to node " << node_name; 398 << " to node " << node_name;
399 399
400 std::vector<PortRef> ports_to_notify; 400 DestroyAllPortsWithPeer(node_name, kInvalidPortName);
401
402 {
403 base::AutoLock ports_lock(ports_lock_);
404
405 for (auto iter = ports_.begin(); iter != ports_.end(); ) {
406 scoped_refptr<Port>& port = iter->second;
407
408 bool remove_port = false;
409 {
410 base::AutoLock port_lock(port->lock);
411
412 if (port->peer_node_name == node_name) {
413 // We can no longer send messages to this port's peer. We assume we
414 // will not receive any more messages from this port's peer as well.
415 if (!port->peer_closed) {
416 port->peer_closed = true;
417 port->last_sequence_num_to_receive =
418 port->message_queue.next_sequence_num() - 1;
419
420 if (port->state == Port::kReceiving)
421 ports_to_notify.push_back(PortRef(iter->first, port));
422 }
423
424 // We do not expect to forward any further messages, and we do not
425 // expect to receive a Port{Accepted,Rejected} event.
426 if (port->state != Port::kReceiving)
427 remove_port = true;
428 }
429 }
430
431 if (remove_port) {
432 DVLOG(2) << "Deleted port " << iter->first << "@" << name_;
433 iter = ports_.erase(iter);
434 } else {
435 ++iter;
436 }
437 }
438 }
439
440 for (size_t i = 0; i < ports_to_notify.size(); ++i)
441 delegate_->PortStatusChanged(ports_to_notify[i]);
442
443 return OK; 401 return OK;
444 } 402 }
445 403
446 int Node::OnUserMessage(ScopedMessage message) { 404 int Node::OnUserMessage(ScopedMessage message) {
447 PortName port_name = GetEventHeader(*message)->port_name; 405 PortName port_name = GetEventHeader(*message)->port_name;
448 const auto* event = GetEventData<UserEventData>(*message); 406 const auto* event = GetEventData<UserEventData>(*message);
449 407
450 #if !defined(NDEBUG) 408 #if !defined(NDEBUG)
451 std::ostringstream ports_buf; 409 std::ostringstream ports_buf;
452 for (size_t i = 0; i < message->num_ports(); ++i) { 410 for (size_t i = 0; i < message->num_ports(); ++i) {
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
540 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ 498 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
541 << " pointing to " 499 << " pointing to "
542 << port->peer_port_name << "@" << port->peer_node_name; 500 << port->peer_port_name << "@" << port->peer_node_name;
543 501
544 return BeginProxying_Locked(port.get(), port_name); 502 return BeginProxying_Locked(port.get(), port_name);
545 } 503 }
546 } 504 }
547 505
548 int Node::OnObserveProxy(const PortName& port_name, 506 int Node::OnObserveProxy(const PortName& port_name,
549 const ObserveProxyEventData& event) { 507 const ObserveProxyEventData& event) {
508 if (port_name == kInvalidPortName) {
509 // An ObserveProxy with an invalid target port name is a broadcast used to
510 // inform ports when their peer (which was itself a proxy) has become
511 // defunct due to unexpected node disconnection.
512 //
513 // Receiving ports affected by this treat it as equivalent to peer closure.
514 // Proxies affected by this can be removed and will in turn broadcast their
515 // own death with a similar message.
516 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
517 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
518 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
519 return OK;
520 }
521
550 // The port may have already been closed locally, in which case the 522 // The port may have already been closed locally, in which case the
551 // ObserveClosure message will contain the last_sequence_num field. 523 // ObserveClosure message will contain the last_sequence_num field.
552 // We can then silently ignore this message. 524 // We can then silently ignore this message.
553 scoped_refptr<Port> port = GetPort(port_name); 525 scoped_refptr<Port> port = GetPort(port_name);
554 if (!port) { 526 if (!port) {
555 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; 527 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
556 528
557 if (port_name != event.proxy_port_name && 529 if (port_name != event.proxy_port_name &&
558 port_name != event.proxy_to_port_name) { 530 port_name != event.proxy_to_port_name) {
559 // The receiving port may have been removed while this message was in 531 // The receiving port may have been removed while this message was in
(...skipping 659 matching lines...) Expand 10 before | Expand all | Expand 10 after
1219 ScopedMessage& message = port->send_on_proxy_removal->second; 1191 ScopedMessage& message = port->send_on_proxy_removal->second;
1220 1192
1221 delegate_->ForwardMessage(to_node, std::move(message)); 1193 delegate_->ForwardMessage(to_node, std::move(message));
1222 } 1194 }
1223 } else { 1195 } else {
1224 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_ 1196 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
1225 << " now; waiting for more messages"; 1197 << " now; waiting for more messages";
1226 } 1198 }
1227 } 1199 }
1228 1200
1201 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1202 const PortName& port_name) {
1203 // Wipes out all ports whose peer node matches |node_name| and whose peer port
1204 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1205 // node is matched.
1206
1207 std::vector<PortRef> ports_to_notify;
1208 std::vector<PortName> dead_proxies_to_broadcast;
1209
1210 {
1211 base::AutoLock ports_lock(ports_lock_);
1212
1213 for (auto iter = ports_.begin(); iter != ports_.end(); ) {
1214 scoped_refptr<Port>& port = iter->second;
Anand Mistry (off Chromium) 2016/05/16 04:03:57 very subjective nit: Looks like using Port* is a f
Ken Rockot(use gerrit already) 2016/05/16 04:42:36 Done
1215
1216 bool remove_port = false;
1217 {
1218 base::AutoLock port_lock(port->lock);
1219
1220 if (port->peer_node_name == node_name &&
1221 (port_name == kInvalidPortName ||
1222 port->peer_port_name == port_name)) {
1223 if (!port->peer_closed) {
1224 // Treat this as immediate peer closure. It's an exceptional
1225 // condition akin to a broken pipe, so we don't care about losing
1226 // messages.
1227
1228 port->peer_closed = true;
1229 port->last_sequence_num_to_receive =
1230 port->message_queue.next_sequence_num() - 1;
1231
1232 if (port->state == Port::kReceiving)
1233 ports_to_notify.push_back(PortRef(iter->first, port));
1234 }
1235
1236 // We don't expect to forward any further messages, and we don't
1237 // expect to receive a Port{Accepted,Rejected} event. Because we're
1238 // a proxy with no active peer, we cannot use the normal proxy removal
1239 // procedure of forward-propagating an ObserveProxy. Instead we
1240 // broadcast our own death so it can be back-propagated. This is
1241 // inefficient but rare.
1242 if (port->state != Port::kReceiving) {
1243 remove_port = true;
1244 dead_proxies_to_broadcast.push_back(iter->first);
1245 }
1246 }
1247 }
1248
1249 if (remove_port) {
1250 DVLOG(2) << "Forcibly deleted port " << iter->first << "@" << name_;
1251 iter = ports_.erase(iter);
Anand Mistry (off Chromium) 2016/05/16 04:03:57 um... according to http://en.cppreference.com/w/cp
Ken Rockot(use gerrit already) 2016/05/16 04:42:36 Yeah, that makes me uncomfortable. I changed it to
1252 } else {
1253 ++iter;
1254 }
1255 }
1256 }
1257
1258 // Wake up any receiving ports who have just observed simulated peer closure.
1259 for (size_t i = 0; i < ports_to_notify.size(); ++i)
Anand Mistry (off Chromium) 2016/05/16 04:03:57 for (const auto& port : ports_to_notify)
Ken Rockot(use gerrit already) 2016/05/16 04:42:36 Done
1260 delegate_->PortStatusChanged(ports_to_notify[i]);
1261
1262 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1263 // Broadcast an event signifying that this proxy is no longer functioning.
1264 ObserveProxyEventData event;
1265 event.proxy_node_name = name_;
1266 event.proxy_port_name = proxy_name;
1267 event.proxy_to_node_name = kInvalidNodeName;
1268 event.proxy_to_port_name = kInvalidPortName;
1269 delegate_->BroadcastMessage(NewInternalMessage(
1270 kInvalidPortName, EventType::kObserveProxy, event));
1271 }
1272 }
1273
1229 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, 1274 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
1230 const EventType& type, 1275 const EventType& type,
1231 const void* data, 1276 const void* data,
1232 size_t num_data_bytes) { 1277 size_t num_data_bytes) {
1233 ScopedMessage message; 1278 ScopedMessage message;
1234 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); 1279 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
1235 1280
1236 EventHeader* header = GetMutableEventHeader(message.get()); 1281 EventHeader* header = GetMutableEventHeader(message.get());
1237 header->port_name = port_name; 1282 header->port_name = port_name;
1238 header->type = type; 1283 header->type = type;
1239 header->padding = 0; 1284 header->padding = 0;
1240 1285
1241 if (num_data_bytes) 1286 if (num_data_bytes)
1242 memcpy(header + 1, data, num_data_bytes); 1287 memcpy(header + 1, data, num_data_bytes);
1243 1288
1244 return message; 1289 return message;
1245 } 1290 }
1246 1291
1247 } // namespace ports 1292 } // namespace ports
1248 } // namespace edk 1293 } // namespace edk
1249 } // namespace mojo 1294 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698