OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
(...skipping 379 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
390 return rv; | 390 return rv; |
391 } | 391 } |
392 | 392 |
393 int Node::LostConnectionToNode(const NodeName& node_name) { | 393 int Node::LostConnectionToNode(const NodeName& node_name) { |
394 // We can no longer send events to the given node. We also can't expect any | 394 // We can no longer send events to the given node. We also can't expect any |
395 // PortAccepted events. | 395 // PortAccepted events. |
396 | 396 |
397 DVLOG(1) << "Observing lost connection from node " << name_ | 397 DVLOG(1) << "Observing lost connection from node " << name_ |
398 << " to node " << node_name; | 398 << " to node " << node_name; |
399 | 399 |
400 std::vector<PortRef> ports_to_notify; | 400 DestroyAllPortsWithPeer(node_name, kInvalidPortName); |
401 | |
402 { | |
403 base::AutoLock ports_lock(ports_lock_); | |
404 | |
405 for (auto iter = ports_.begin(); iter != ports_.end(); ) { | |
406 scoped_refptr<Port>& port = iter->second; | |
407 | |
408 bool remove_port = false; | |
409 { | |
410 base::AutoLock port_lock(port->lock); | |
411 | |
412 if (port->peer_node_name == node_name) { | |
413 // We can no longer send messages to this port's peer. We assume we | |
414 // will not receive any more messages from this port's peer as well. | |
415 if (!port->peer_closed) { | |
416 port->peer_closed = true; | |
417 port->last_sequence_num_to_receive = | |
418 port->message_queue.next_sequence_num() - 1; | |
419 | |
420 if (port->state == Port::kReceiving) | |
421 ports_to_notify.push_back(PortRef(iter->first, port)); | |
422 } | |
423 | |
424 // We do not expect to forward any further messages, and we do not | |
425 // expect to receive a Port{Accepted,Rejected} event. | |
426 if (port->state != Port::kReceiving) | |
427 remove_port = true; | |
428 } | |
429 } | |
430 | |
431 if (remove_port) { | |
432 DVLOG(2) << "Deleted port " << iter->first << "@" << name_; | |
433 iter = ports_.erase(iter); | |
434 } else { | |
435 ++iter; | |
436 } | |
437 } | |
438 } | |
439 | |
440 for (size_t i = 0; i < ports_to_notify.size(); ++i) | |
441 delegate_->PortStatusChanged(ports_to_notify[i]); | |
442 | |
443 return OK; | 401 return OK; |
444 } | 402 } |
445 | 403 |
446 int Node::OnUserMessage(ScopedMessage message) { | 404 int Node::OnUserMessage(ScopedMessage message) { |
447 PortName port_name = GetEventHeader(*message)->port_name; | 405 PortName port_name = GetEventHeader(*message)->port_name; |
448 const auto* event = GetEventData<UserEventData>(*message); | 406 const auto* event = GetEventData<UserEventData>(*message); |
449 | 407 |
450 #if !defined(NDEBUG) | 408 #if !defined(NDEBUG) |
451 std::ostringstream ports_buf; | 409 std::ostringstream ports_buf; |
452 for (size_t i = 0; i < message->num_ports(); ++i) { | 410 for (size_t i = 0; i < message->num_ports(); ++i) { |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
540 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ | 498 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
541 << " pointing to " | 499 << " pointing to " |
542 << port->peer_port_name << "@" << port->peer_node_name; | 500 << port->peer_port_name << "@" << port->peer_node_name; |
543 | 501 |
544 return BeginProxying_Locked(port.get(), port_name); | 502 return BeginProxying_Locked(port.get(), port_name); |
545 } | 503 } |
546 } | 504 } |
547 | 505 |
548 int Node::OnObserveProxy(const PortName& port_name, | 506 int Node::OnObserveProxy(const PortName& port_name, |
549 const ObserveProxyEventData& event) { | 507 const ObserveProxyEventData& event) { |
508 if (port_name == kInvalidPortName) { | |
509 // An ObserveProxy with an invalid target port name is a broadcast used to | |
510 // inform ports when their peer (which was itself a proxy) has become | |
511 // defunct due to unexpected node disconnection. | |
512 // | |
513 // Receiving ports affected by this treat it as equivalent to peer closure. | |
514 // Proxies affected by this can be removed and will in turn broadcast their | |
515 // own death with a similar message. | |
516 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName); | |
517 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName); | |
518 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name); | |
519 return OK; | |
520 } | |
521 | |
550 // The port may have already been closed locally, in which case the | 522 // The port may have already been closed locally, in which case the |
551 // ObserveClosure message will contain the last_sequence_num field. | 523 // ObserveClosure message will contain the last_sequence_num field. |
552 // We can then silently ignore this message. | 524 // We can then silently ignore this message. |
553 scoped_refptr<Port> port = GetPort(port_name); | 525 scoped_refptr<Port> port = GetPort(port_name); |
554 if (!port) { | 526 if (!port) { |
555 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; | 527 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
556 | 528 |
557 if (port_name != event.proxy_port_name && | 529 if (port_name != event.proxy_port_name && |
558 port_name != event.proxy_to_port_name) { | 530 port_name != event.proxy_to_port_name) { |
559 // The receiving port may have been removed while this message was in | 531 // The receiving port may have been removed while this message was in |
(...skipping 659 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1219 ScopedMessage& message = port->send_on_proxy_removal->second; | 1191 ScopedMessage& message = port->send_on_proxy_removal->second; |
1220 | 1192 |
1221 delegate_->ForwardMessage(to_node, std::move(message)); | 1193 delegate_->ForwardMessage(to_node, std::move(message)); |
1222 } | 1194 } |
1223 } else { | 1195 } else { |
1224 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_ | 1196 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_ |
1225 << " now; waiting for more messages"; | 1197 << " now; waiting for more messages"; |
1226 } | 1198 } |
1227 } | 1199 } |
1228 | 1200 |
1201 void Node::DestroyAllPortsWithPeer(const NodeName& node_name, | |
1202 const PortName& port_name) { | |
1203 // Wipes out all ports whose peer node matches |node_name| and whose peer port | |
1204 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer | |
1205 // node is matched. | |
1206 | |
1207 std::vector<PortRef> ports_to_notify; | |
1208 std::vector<PortName> dead_proxies_to_broadcast; | |
1209 | |
1210 { | |
1211 base::AutoLock ports_lock(ports_lock_); | |
1212 | |
1213 for (auto iter = ports_.begin(); iter != ports_.end(); ) { | |
1214 scoped_refptr<Port>& port = iter->second; | |
Anand Mistry (off Chromium)
2016/05/16 04:03:57
very subjective nit: Looks like using Port* is a f
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
| |
1215 | |
1216 bool remove_port = false; | |
1217 { | |
1218 base::AutoLock port_lock(port->lock); | |
1219 | |
1220 if (port->peer_node_name == node_name && | |
1221 (port_name == kInvalidPortName || | |
1222 port->peer_port_name == port_name)) { | |
1223 if (!port->peer_closed) { | |
1224 // Treat this as immediate peer closure. It's an exceptional | |
1225 // condition akin to a broken pipe, so we don't care about losing | |
1226 // messages. | |
1227 | |
1228 port->peer_closed = true; | |
1229 port->last_sequence_num_to_receive = | |
1230 port->message_queue.next_sequence_num() - 1; | |
1231 | |
1232 if (port->state == Port::kReceiving) | |
1233 ports_to_notify.push_back(PortRef(iter->first, port)); | |
1234 } | |
1235 | |
1236 // We don't expect to forward any further messages, and we don't | |
1237 // expect to receive a Port{Accepted,Rejected} event. Because we're | |
1238 // a proxy with no active peer, we cannot use the normal proxy removal | |
1239 // procedure of forward-propagating an ObserveProxy. Instead we | |
1240 // broadcast our own death so it can be back-propagated. This is | |
1241 // inefficient but rare. | |
1242 if (port->state != Port::kReceiving) { | |
1243 remove_port = true; | |
1244 dead_proxies_to_broadcast.push_back(iter->first); | |
1245 } | |
1246 } | |
1247 } | |
1248 | |
1249 if (remove_port) { | |
1250 DVLOG(2) << "Forcibly deleted port " << iter->first << "@" << name_; | |
1251 iter = ports_.erase(iter); | |
Anand Mistry (off Chromium)
2016/05/16 04:03:57
um... according to http://en.cppreference.com/w/cp
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Yeah, that makes me uncomfortable. I changed it to
| |
1252 } else { | |
1253 ++iter; | |
1254 } | |
1255 } | |
1256 } | |
1257 | |
1258 // Wake up any receiving ports who have just observed simulated peer closure. | |
1259 for (size_t i = 0; i < ports_to_notify.size(); ++i) | |
Anand Mistry (off Chromium)
2016/05/16 04:03:57
for (const auto& port : ports_to_notify)
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
| |
1260 delegate_->PortStatusChanged(ports_to_notify[i]); | |
1261 | |
1262 for (const auto& proxy_name : dead_proxies_to_broadcast) { | |
1263 // Broadcast an event signifying that this proxy is no longer functioning. | |
1264 ObserveProxyEventData event; | |
1265 event.proxy_node_name = name_; | |
1266 event.proxy_port_name = proxy_name; | |
1267 event.proxy_to_node_name = kInvalidNodeName; | |
1268 event.proxy_to_port_name = kInvalidPortName; | |
1269 delegate_->BroadcastMessage(NewInternalMessage( | |
1270 kInvalidPortName, EventType::kObserveProxy, event)); | |
1271 } | |
1272 } | |
1273 | |
1229 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, | 1274 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, |
1230 const EventType& type, | 1275 const EventType& type, |
1231 const void* data, | 1276 const void* data, |
1232 size_t num_data_bytes) { | 1277 size_t num_data_bytes) { |
1233 ScopedMessage message; | 1278 ScopedMessage message; |
1234 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); | 1279 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); |
1235 | 1280 |
1236 EventHeader* header = GetMutableEventHeader(message.get()); | 1281 EventHeader* header = GetMutableEventHeader(message.get()); |
1237 header->port_name = port_name; | 1282 header->port_name = port_name; |
1238 header->type = type; | 1283 header->type = type; |
1239 header->padding = 0; | 1284 header->padding = 0; |
1240 | 1285 |
1241 if (num_data_bytes) | 1286 if (num_data_bytes) |
1242 memcpy(header + 1, data, num_data_bytes); | 1287 memcpy(header + 1, data, num_data_bytes); |
1243 | 1288 |
1244 return message; | 1289 return message; |
1245 } | 1290 } |
1246 | 1291 |
1247 } // namespace ports | 1292 } // namespace ports |
1248 } // namespace edk | 1293 } // namespace edk |
1249 } // namespace mojo | 1294 } // namespace mojo |
OLD | NEW |