OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
(...skipping 392 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
403 return rv; | 403 return rv; |
404 } | 404 } |
405 | 405 |
406 int Node::LostConnectionToNode(const NodeName& node_name) { | 406 int Node::LostConnectionToNode(const NodeName& node_name) { |
407 // We can no longer send events to the given node. We also can't expect any | 407 // We can no longer send events to the given node. We also can't expect any |
408 // PortAccepted events. | 408 // PortAccepted events. |
409 | 409 |
410 DVLOG(1) << "Observing lost connection from node " << name_ | 410 DVLOG(1) << "Observing lost connection from node " << name_ |
411 << " to node " << node_name; | 411 << " to node " << node_name; |
412 | 412 |
413 std::vector<PortRef> ports_to_notify; | 413 DestroyAllPortsWithPeer(node_name, kInvalidPortName); |
414 | |
415 { | |
416 base::AutoLock ports_lock(ports_lock_); | |
417 | |
418 for (auto iter = ports_.begin(); iter != ports_.end(); ) { | |
419 scoped_refptr<Port>& port = iter->second; | |
420 | |
421 bool remove_port = false; | |
422 { | |
423 base::AutoLock port_lock(port->lock); | |
424 | |
425 if (port->peer_node_name == node_name) { | |
426 // We can no longer send messages to this port's peer. We assume we | |
427 // will not receive any more messages from this port's peer as well. | |
428 if (!port->peer_closed) { | |
429 port->peer_closed = true; | |
430 port->last_sequence_num_to_receive = | |
431 port->message_queue.next_sequence_num() - 1; | |
432 | |
433 if (port->state == Port::kReceiving) | |
434 ports_to_notify.push_back(PortRef(iter->first, port)); | |
435 } | |
436 | |
437 // We do not expect to forward any further messages, and we do not | |
438 // expect to receive a Port{Accepted,Rejected} event. | |
439 if (port->state != Port::kReceiving) | |
440 remove_port = true; | |
441 } | |
442 } | |
443 | |
444 if (remove_port) { | |
445 DVLOG(2) << "Deleted port " << iter->first << "@" << name_; | |
446 iter = ports_.erase(iter); | |
447 } else { | |
448 ++iter; | |
449 } | |
450 } | |
451 } | |
452 | |
453 for (size_t i = 0; i < ports_to_notify.size(); ++i) | |
454 delegate_->PortStatusChanged(ports_to_notify[i]); | |
455 | |
456 return OK; | 414 return OK; |
457 } | 415 } |
458 | 416 |
459 int Node::OnUserMessage(ScopedMessage message) { | 417 int Node::OnUserMessage(ScopedMessage message) { |
460 PortName port_name = GetEventHeader(*message)->port_name; | 418 PortName port_name = GetEventHeader(*message)->port_name; |
461 const auto* event = GetEventData<UserEventData>(*message); | 419 const auto* event = GetEventData<UserEventData>(*message); |
462 | 420 |
463 #if DCHECK_IS_ON() | 421 #if DCHECK_IS_ON() |
464 std::ostringstream ports_buf; | 422 std::ostringstream ports_buf; |
465 for (size_t i = 0; i < message->num_ports(); ++i) { | 423 for (size_t i = 0; i < message->num_ports(); ++i) { |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
546 | 504 |
547 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ | 505 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
548 << " pointing to " | 506 << " pointing to " |
549 << port->peer_port_name << "@" << port->peer_node_name; | 507 << port->peer_port_name << "@" << port->peer_node_name; |
550 | 508 |
551 return BeginProxying(PortRef(port_name, port)); | 509 return BeginProxying(PortRef(port_name, port)); |
552 } | 510 } |
553 | 511 |
554 int Node::OnObserveProxy(const PortName& port_name, | 512 int Node::OnObserveProxy(const PortName& port_name, |
555 const ObserveProxyEventData& event) { | 513 const ObserveProxyEventData& event) { |
556 if (port_name == kInvalidPortName) | 514 if (port_name == kInvalidPortName) { |
| 515 // An ObserveProxy with an invalid target port name is a broadcast used to |
| 516 // inform ports when their peer (which was itself a proxy) has become |
| 517 // defunct due to unexpected node disconnection. |
| 518 // |
| 519 // Receiving ports affected by this treat it as equivalent to peer closure. |
| 520 // Proxies affected by this can be removed and will in turn broadcast their |
| 521 // own death with a similar message. |
| 522 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName); |
| 523 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName); |
| 524 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name); |
557 return OK; | 525 return OK; |
| 526 } |
558 | 527 |
559 // The port may have already been closed locally, in which case the | 528 // The port may have already been closed locally, in which case the |
560 // ObserveClosure message will contain the last_sequence_num field. | 529 // ObserveClosure message will contain the last_sequence_num field. |
561 // We can then silently ignore this message. | 530 // We can then silently ignore this message. |
562 scoped_refptr<Port> port = GetPort(port_name); | 531 scoped_refptr<Port> port = GetPort(port_name); |
563 if (!port) { | 532 if (!port) { |
564 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; | 533 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
565 | 534 |
566 if (port_name != event.proxy_port_name && | 535 if (port_name != event.proxy_port_name && |
567 port_name != event.proxy_to_port_name) { | 536 port_name != event.proxy_to_port_name) { |
(...skipping 743 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1311 } | 1280 } |
1312 } | 1281 } |
1313 | 1282 |
1314 if (should_erase) | 1283 if (should_erase) |
1315 ErasePort(port_ref.name()); | 1284 ErasePort(port_ref.name()); |
1316 | 1285 |
1317 if (msg) | 1286 if (msg) |
1318 delegate_->ForwardMessage(to_node, std::move(msg)); | 1287 delegate_->ForwardMessage(to_node, std::move(msg)); |
1319 } | 1288 } |
1320 | 1289 |
| 1290 void Node::DestroyAllPortsWithPeer(const NodeName& node_name, |
| 1291 const PortName& port_name) { |
| 1292 // Wipes out all ports whose peer node matches |node_name| and whose peer port |
| 1293 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer |
| 1294 // node is matched. |
| 1295 |
| 1296 std::vector<PortRef> ports_to_notify; |
| 1297 std::vector<PortName> dead_proxies_to_broadcast; |
| 1298 std::deque<PortName> referenced_port_names; |
| 1299 |
| 1300 { |
| 1301 base::AutoLock ports_lock(ports_lock_); |
| 1302 |
| 1303 for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) { |
| 1304 Port* port = iter->second.get(); |
| 1305 { |
| 1306 base::AutoLock port_lock(port->lock); |
| 1307 |
| 1308 if (port->peer_node_name == node_name && |
| 1309 (port_name == kInvalidPortName || |
| 1310 port->peer_port_name == port_name)) { |
| 1311 if (!port->peer_closed) { |
| 1312 // Treat this as immediate peer closure. It's an exceptional |
| 1313 // condition akin to a broken pipe, so we don't care about losing |
| 1314 // messages. |
| 1315 |
| 1316 port->peer_closed = true; |
| 1317 port->last_sequence_num_to_receive = |
| 1318 port->message_queue.next_sequence_num() - 1; |
| 1319 |
| 1320 if (port->state == Port::kReceiving) |
| 1321 ports_to_notify.push_back(PortRef(iter->first, port)); |
| 1322 } |
| 1323 |
| 1324 // We don't expect to forward any further messages, and we don't |
| 1325 // expect to receive a Port{Accepted,Rejected} event. Because we're |
| 1326 // a proxy with no active peer, we cannot use the normal proxy removal |
| 1327 // procedure of forward-propagating an ObserveProxy. Instead we |
| 1328 // broadcast our own death so it can be back-propagated. This is |
| 1329 // inefficient but rare. |
| 1330 if (port->state != Port::kReceiving) { |
| 1331 dead_proxies_to_broadcast.push_back(iter->first); |
| 1332 iter->second->message_queue.GetReferencedPorts( |
| 1333 &referenced_port_names); |
| 1334 } |
| 1335 } |
| 1336 } |
| 1337 } |
| 1338 |
| 1339 for (const auto& proxy_name : dead_proxies_to_broadcast) { |
| 1340 ports_.erase(proxy_name); |
| 1341 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_; |
| 1342 } |
| 1343 } |
| 1344 |
| 1345 // Wake up any receiving ports who have just observed simulated peer closure. |
| 1346 for (const auto& port : ports_to_notify) |
| 1347 delegate_->PortStatusChanged(port); |
| 1348 |
| 1349 for (const auto& proxy_name : dead_proxies_to_broadcast) { |
| 1350 // Broadcast an event signifying that this proxy is no longer functioning. |
| 1351 ObserveProxyEventData event; |
| 1352 event.proxy_node_name = name_; |
| 1353 event.proxy_port_name = proxy_name; |
| 1354 event.proxy_to_node_name = kInvalidNodeName; |
| 1355 event.proxy_to_port_name = kInvalidPortName; |
| 1356 delegate_->BroadcastMessage(NewInternalMessage( |
| 1357 kInvalidPortName, EventType::kObserveProxy, event)); |
| 1358 } |
| 1359 |
| 1360 // Close any ports referenced by the closed proxies. |
| 1361 for (const auto& name : referenced_port_names) { |
| 1362 PortRef ref; |
| 1363 if (GetPort(name, &ref) == OK) |
| 1364 ClosePort(ref); |
| 1365 } |
| 1366 } |
| 1367 |
1321 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, | 1368 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, |
1322 const EventType& type, | 1369 const EventType& type, |
1323 const void* data, | 1370 const void* data, |
1324 size_t num_data_bytes) { | 1371 size_t num_data_bytes) { |
1325 ScopedMessage message; | 1372 ScopedMessage message; |
1326 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); | 1373 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); |
1327 | 1374 |
1328 EventHeader* header = GetMutableEventHeader(message.get()); | 1375 EventHeader* header = GetMutableEventHeader(message.get()); |
1329 header->port_name = port_name; | 1376 header->port_name = port_name; |
1330 header->type = type; | 1377 header->type = type; |
1331 header->padding = 0; | 1378 header->padding = 0; |
1332 | 1379 |
1333 if (num_data_bytes) | 1380 if (num_data_bytes) |
1334 memcpy(header + 1, data, num_data_bytes); | 1381 memcpy(header + 1, data, num_data_bytes); |
1335 | 1382 |
1336 return message; | 1383 return message; |
1337 } | 1384 } |
1338 | 1385 |
1339 } // namespace ports | 1386 } // namespace ports |
1340 } // namespace edk | 1387 } // namespace edk |
1341 } // namespace mojo | 1388 } // namespace mojo |
OLD | NEW |