Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(141)

Side by Side Diff: mojo/edk/system/ports/node.cc

Issue 1975073002: [mojo-edk] Broadcast surprise port disruptions (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@reenable-clean-shutdown
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/edk/system/ports/node_delegate.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/ports/node.h" 5 #include "mojo/edk/system/ports/node.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
(...skipping 392 matching lines...) Expand 10 before | Expand all | Expand 10 after
403 return rv; 403 return rv;
404 } 404 }
405 405
406 int Node::LostConnectionToNode(const NodeName& node_name) { 406 int Node::LostConnectionToNode(const NodeName& node_name) {
407 // We can no longer send events to the given node. We also can't expect any 407 // We can no longer send events to the given node. We also can't expect any
408 // PortAccepted events. 408 // PortAccepted events.
409 409
410 DVLOG(1) << "Observing lost connection from node " << name_ 410 DVLOG(1) << "Observing lost connection from node " << name_
411 << " to node " << node_name; 411 << " to node " << node_name;
412 412
413 std::vector<PortRef> ports_to_notify; 413 DestroyAllPortsWithPeer(node_name, kInvalidPortName);
414
415 {
416 base::AutoLock ports_lock(ports_lock_);
417
418 for (auto iter = ports_.begin(); iter != ports_.end(); ) {
419 scoped_refptr<Port>& port = iter->second;
420
421 bool remove_port = false;
422 {
423 base::AutoLock port_lock(port->lock);
424
425 if (port->peer_node_name == node_name) {
426 // We can no longer send messages to this port's peer. We assume we
427 // will not receive any more messages from this port's peer as well.
428 if (!port->peer_closed) {
429 port->peer_closed = true;
430 port->last_sequence_num_to_receive =
431 port->message_queue.next_sequence_num() - 1;
432
433 if (port->state == Port::kReceiving)
434 ports_to_notify.push_back(PortRef(iter->first, port));
435 }
436
437 // We do not expect to forward any further messages, and we do not
438 // expect to receive a Port{Accepted,Rejected} event.
439 if (port->state != Port::kReceiving)
440 remove_port = true;
441 }
442 }
443
444 if (remove_port) {
445 DVLOG(2) << "Deleted port " << iter->first << "@" << name_;
446 iter = ports_.erase(iter);
447 } else {
448 ++iter;
449 }
450 }
451 }
452
453 for (size_t i = 0; i < ports_to_notify.size(); ++i)
454 delegate_->PortStatusChanged(ports_to_notify[i]);
455
456 return OK; 414 return OK;
457 } 415 }
458 416
459 int Node::OnUserMessage(ScopedMessage message) { 417 int Node::OnUserMessage(ScopedMessage message) {
460 PortName port_name = GetEventHeader(*message)->port_name; 418 PortName port_name = GetEventHeader(*message)->port_name;
461 const auto* event = GetEventData<UserEventData>(*message); 419 const auto* event = GetEventData<UserEventData>(*message);
462 420
463 #if DCHECK_IS_ON() 421 #if DCHECK_IS_ON()
464 std::ostringstream ports_buf; 422 std::ostringstream ports_buf;
465 for (size_t i = 0; i < message->num_ports(); ++i) { 423 for (size_t i = 0; i < message->num_ports(); ++i) {
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
546 504
547 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ 505 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
548 << " pointing to " 506 << " pointing to "
549 << port->peer_port_name << "@" << port->peer_node_name; 507 << port->peer_port_name << "@" << port->peer_node_name;
550 508
551 return BeginProxying(PortRef(port_name, port)); 509 return BeginProxying(PortRef(port_name, port));
552 } 510 }
553 511
554 int Node::OnObserveProxy(const PortName& port_name, 512 int Node::OnObserveProxy(const PortName& port_name,
555 const ObserveProxyEventData& event) { 513 const ObserveProxyEventData& event) {
556 if (port_name == kInvalidPortName) 514 if (port_name == kInvalidPortName) {
515 // An ObserveProxy with an invalid target port name is a broadcast used to
516 // inform ports when their peer (which was itself a proxy) has become
517 // defunct due to unexpected node disconnection.
518 //
519 // Receiving ports affected by this treat it as equivalent to peer closure.
520 // Proxies affected by this can be removed and will in turn broadcast their
521 // own death with a similar message.
522 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
523 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
524 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
557 return OK; 525 return OK;
526 }
558 527
559 // The port may have already been closed locally, in which case the 528 // The port may have already been closed locally, in which case the
560 // ObserveClosure message will contain the last_sequence_num field. 529 // ObserveClosure message will contain the last_sequence_num field.
561 // We can then silently ignore this message. 530 // We can then silently ignore this message.
562 scoped_refptr<Port> port = GetPort(port_name); 531 scoped_refptr<Port> port = GetPort(port_name);
563 if (!port) { 532 if (!port) {
564 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; 533 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
565 534
566 if (port_name != event.proxy_port_name && 535 if (port_name != event.proxy_port_name &&
567 port_name != event.proxy_to_port_name) { 536 port_name != event.proxy_to_port_name) {
(...skipping 743 matching lines...) Expand 10 before | Expand all | Expand 10 after
1311 } 1280 }
1312 } 1281 }
1313 1282
1314 if (should_erase) 1283 if (should_erase)
1315 ErasePort(port_ref.name()); 1284 ErasePort(port_ref.name());
1316 1285
1317 if (msg) 1286 if (msg)
1318 delegate_->ForwardMessage(to_node, std::move(msg)); 1287 delegate_->ForwardMessage(to_node, std::move(msg));
1319 } 1288 }
1320 1289
1290 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1291 const PortName& port_name) {
1292 // Wipes out all ports whose peer node matches |node_name| and whose peer port
1293 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1294 // node is matched.
1295
1296 std::vector<PortRef> ports_to_notify;
1297 std::vector<PortName> dead_proxies_to_broadcast;
1298 std::deque<PortName> referenced_port_names;
1299
1300 {
1301 base::AutoLock ports_lock(ports_lock_);
1302
1303 for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
1304 Port* port = iter->second.get();
1305 {
1306 base::AutoLock port_lock(port->lock);
1307
1308 if (port->peer_node_name == node_name &&
1309 (port_name == kInvalidPortName ||
1310 port->peer_port_name == port_name)) {
1311 if (!port->peer_closed) {
1312 // Treat this as immediate peer closure. It's an exceptional
1313 // condition akin to a broken pipe, so we don't care about losing
1314 // messages.
1315
1316 port->peer_closed = true;
1317 port->last_sequence_num_to_receive =
1318 port->message_queue.next_sequence_num() - 1;
1319
1320 if (port->state == Port::kReceiving)
1321 ports_to_notify.push_back(PortRef(iter->first, port));
1322 }
1323
1324 // We don't expect to forward any further messages, and we don't
1325 // expect to receive a Port{Accepted,Rejected} event. Because we're
1326 // a proxy with no active peer, we cannot use the normal proxy removal
1327 // procedure of forward-propagating an ObserveProxy. Instead we
1328 // broadcast our own death so it can be back-propagated. This is
1329 // inefficient but rare.
1330 if (port->state != Port::kReceiving) {
1331 dead_proxies_to_broadcast.push_back(iter->first);
1332 iter->second->message_queue.GetReferencedPorts(
1333 &referenced_port_names);
1334 }
1335 }
1336 }
1337 }
1338
1339 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1340 ports_.erase(proxy_name);
1341 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1342 }
1343 }
1344
1345 // Wake up any receiving ports who have just observed simulated peer closure.
1346 for (const auto& port : ports_to_notify)
1347 delegate_->PortStatusChanged(port);
1348
1349 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1350 // Broadcast an event signifying that this proxy is no longer functioning.
1351 ObserveProxyEventData event;
1352 event.proxy_node_name = name_;
1353 event.proxy_port_name = proxy_name;
1354 event.proxy_to_node_name = kInvalidNodeName;
1355 event.proxy_to_port_name = kInvalidPortName;
1356 delegate_->BroadcastMessage(NewInternalMessage(
1357 kInvalidPortName, EventType::kObserveProxy, event));
1358 }
1359
1360 // Close any ports referenced by the closed proxies.
1361 for (const auto& name : referenced_port_names) {
1362 PortRef ref;
1363 if (GetPort(name, &ref) == OK)
1364 ClosePort(ref);
1365 }
1366 }
1367
1321 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, 1368 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
1322 const EventType& type, 1369 const EventType& type,
1323 const void* data, 1370 const void* data,
1324 size_t num_data_bytes) { 1371 size_t num_data_bytes) {
1325 ScopedMessage message; 1372 ScopedMessage message;
1326 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); 1373 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
1327 1374
1328 EventHeader* header = GetMutableEventHeader(message.get()); 1375 EventHeader* header = GetMutableEventHeader(message.get());
1329 header->port_name = port_name; 1376 header->port_name = port_name;
1330 header->type = type; 1377 header->type = type;
1331 header->padding = 0; 1378 header->padding = 0;
1332 1379
1333 if (num_data_bytes) 1380 if (num_data_bytes)
1334 memcpy(header + 1, data, num_data_bytes); 1381 memcpy(header + 1, data, num_data_bytes);
1335 1382
1336 return message; 1383 return message;
1337 } 1384 }
1338 1385
1339 } // namespace ports 1386 } // namespace ports
1340 } // namespace edk 1387 } // namespace edk
1341 } // namespace mojo 1388 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/edk/system/ports/node_delegate.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698