Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/ports/node.h" | 5 #include "mojo/edk/system/ports/node.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 379 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 390 return rv; | 390 return rv; |
| 391 } | 391 } |
| 392 | 392 |
| 393 int Node::LostConnectionToNode(const NodeName& node_name) { | 393 int Node::LostConnectionToNode(const NodeName& node_name) { |
| 394 // We can no longer send events to the given node. We also can't expect any | 394 // We can no longer send events to the given node. We also can't expect any |
| 395 // PortAccepted events. | 395 // PortAccepted events. |
| 396 | 396 |
| 397 DVLOG(1) << "Observing lost connection from node " << name_ | 397 DVLOG(1) << "Observing lost connection from node " << name_ |
| 398 << " to node " << node_name; | 398 << " to node " << node_name; |
| 399 | 399 |
| 400 std::vector<PortRef> ports_to_notify; | 400 DestroyAllPortsWithPeer(node_name, kInvalidPortName); |
| 401 | |
| 402 { | |
| 403 base::AutoLock ports_lock(ports_lock_); | |
| 404 | |
| 405 for (auto iter = ports_.begin(); iter != ports_.end(); ) { | |
| 406 scoped_refptr<Port>& port = iter->second; | |
| 407 | |
| 408 bool remove_port = false; | |
| 409 { | |
| 410 base::AutoLock port_lock(port->lock); | |
| 411 | |
| 412 if (port->peer_node_name == node_name) { | |
| 413 // We can no longer send messages to this port's peer. We assume we | |
| 414 // will not receive any more messages from this port's peer as well. | |
| 415 if (!port->peer_closed) { | |
| 416 port->peer_closed = true; | |
| 417 port->last_sequence_num_to_receive = | |
| 418 port->message_queue.next_sequence_num() - 1; | |
| 419 | |
| 420 if (port->state == Port::kReceiving) | |
| 421 ports_to_notify.push_back(PortRef(iter->first, port)); | |
| 422 } | |
| 423 | |
| 424 // We do not expect to forward any further messages, and we do not | |
| 425 // expect to receive a Port{Accepted,Rejected} event. | |
| 426 if (port->state != Port::kReceiving) | |
| 427 remove_port = true; | |
| 428 } | |
| 429 } | |
| 430 | |
| 431 if (remove_port) { | |
| 432 DVLOG(2) << "Deleted port " << iter->first << "@" << name_; | |
| 433 iter = ports_.erase(iter); | |
| 434 } else { | |
| 435 ++iter; | |
| 436 } | |
| 437 } | |
| 438 } | |
| 439 | |
| 440 for (size_t i = 0; i < ports_to_notify.size(); ++i) | |
| 441 delegate_->PortStatusChanged(ports_to_notify[i]); | |
| 442 | |
| 443 return OK; | 401 return OK; |
| 444 } | 402 } |
| 445 | 403 |
| 446 int Node::OnUserMessage(ScopedMessage message) { | 404 int Node::OnUserMessage(ScopedMessage message) { |
| 447 PortName port_name = GetEventHeader(*message)->port_name; | 405 PortName port_name = GetEventHeader(*message)->port_name; |
| 448 const auto* event = GetEventData<UserEventData>(*message); | 406 const auto* event = GetEventData<UserEventData>(*message); |
| 449 | 407 |
| 450 #if !defined(NDEBUG) | 408 #if !defined(NDEBUG) |
| 451 std::ostringstream ports_buf; | 409 std::ostringstream ports_buf; |
| 452 for (size_t i = 0; i < message->num_ports(); ++i) { | 410 for (size_t i = 0; i < message->num_ports(); ++i) { |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 540 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ | 498 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
| 541 << " pointing to " | 499 << " pointing to " |
| 542 << port->peer_port_name << "@" << port->peer_node_name; | 500 << port->peer_port_name << "@" << port->peer_node_name; |
| 543 | 501 |
| 544 return BeginProxying_Locked(port.get(), port_name); | 502 return BeginProxying_Locked(port.get(), port_name); |
| 545 } | 503 } |
| 546 } | 504 } |
| 547 | 505 |
| 548 int Node::OnObserveProxy(const PortName& port_name, | 506 int Node::OnObserveProxy(const PortName& port_name, |
| 549 const ObserveProxyEventData& event) { | 507 const ObserveProxyEventData& event) { |
| 508 if (port_name == kInvalidPortName) { | |
| 509 // An ObserveProxy with an invalid target port name is a broadcast used to | |
| 510 // inform ports when their peer (which was itself a proxy) has become | |
| 511 // defunct due to unexpected node disconnection. | |
| 512 // | |
| 513 // Receiving ports affected by this treat it as equivalent to peer closure. | |
| 514 // Proxies affected by this can be removed and will in turn broadcast their | |
| 515 // own death with a similar message. | |
| 516 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName); | |
| 517 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName); | |
| 518 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name); | |
| 519 return OK; | |
| 520 } | |
| 521 | |
| 550 // The port may have already been closed locally, in which case the | 522 // The port may have already been closed locally, in which case the |
| 551 // ObserveClosure message will contain the last_sequence_num field. | 523 // ObserveClosure message will contain the last_sequence_num field. |
| 552 // We can then silently ignore this message. | 524 // We can then silently ignore this message. |
| 553 scoped_refptr<Port> port = GetPort(port_name); | 525 scoped_refptr<Port> port = GetPort(port_name); |
| 554 if (!port) { | 526 if (!port) { |
| 555 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; | 527 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
| 556 | 528 |
| 557 if (port_name != event.proxy_port_name && | 529 if (port_name != event.proxy_port_name && |
| 558 port_name != event.proxy_to_port_name) { | 530 port_name != event.proxy_to_port_name) { |
| 559 // The receiving port may have been removed while this message was in | 531 // The receiving port may have been removed while this message was in |
| (...skipping 659 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1219 ScopedMessage& message = port->send_on_proxy_removal->second; | 1191 ScopedMessage& message = port->send_on_proxy_removal->second; |
| 1220 | 1192 |
| 1221 delegate_->ForwardMessage(to_node, std::move(message)); | 1193 delegate_->ForwardMessage(to_node, std::move(message)); |
| 1222 } | 1194 } |
| 1223 } else { | 1195 } else { |
| 1224 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_ | 1196 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_ |
| 1225 << " now; waiting for more messages"; | 1197 << " now; waiting for more messages"; |
| 1226 } | 1198 } |
| 1227 } | 1199 } |
| 1228 | 1200 |
| 1201 void Node::DestroyAllPortsWithPeer(const NodeName& node_name, | |
| 1202 const PortName& port_name) { | |
| 1203 // Wipes out all ports whose peer node matches |node_name| and whose peer port | |
| 1204 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer | |
| 1205 // node is matched. | |
| 1206 | |
| 1207 std::vector<PortRef> ports_to_notify; | |
| 1208 std::vector<PortName> dead_proxies_to_broadcast; | |
| 1209 | |
| 1210 { | |
| 1211 base::AutoLock ports_lock(ports_lock_); | |
| 1212 | |
| 1213 for (auto iter = ports_.begin(); iter != ports_.end(); ) { | |
| 1214 scoped_refptr<Port>& port = iter->second; | |
|
Anand Mistry (off Chromium)
2016/05/16 04:03:57
very subjective nit: Looks like using Port* is a f
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
| |
| 1215 | |
| 1216 bool remove_port = false; | |
| 1217 { | |
| 1218 base::AutoLock port_lock(port->lock); | |
| 1219 | |
| 1220 if (port->peer_node_name == node_name && | |
| 1221 (port_name == kInvalidPortName || | |
| 1222 port->peer_port_name == port_name)) { | |
| 1223 if (!port->peer_closed) { | |
| 1224 // Treat this as immediate peer closure. It's an exceptional | |
| 1225 // condition akin to a broken pipe, so we don't care about losing | |
| 1226 // messages. | |
| 1227 | |
| 1228 port->peer_closed = true; | |
| 1229 port->last_sequence_num_to_receive = | |
| 1230 port->message_queue.next_sequence_num() - 1; | |
| 1231 | |
| 1232 if (port->state == Port::kReceiving) | |
| 1233 ports_to_notify.push_back(PortRef(iter->first, port)); | |
| 1234 } | |
| 1235 | |
| 1236 // We don't expect to forward any further messages, and we don't | |
| 1237 // expect to receive a Port{Accepted,Rejected} event. Because we're | |
| 1238 // a proxy with no active peer, we cannot use the normal proxy removal | |
| 1239 // procedure of forward-propagating an ObserveProxy. Instead we | |
| 1240 // broadcast our own death so it can be back-propagated. This is | |
| 1241 // inefficient but rare. | |
| 1242 if (port->state != Port::kReceiving) { | |
| 1243 remove_port = true; | |
| 1244 dead_proxies_to_broadcast.push_back(iter->first); | |
| 1245 } | |
| 1246 } | |
| 1247 } | |
| 1248 | |
| 1249 if (remove_port) { | |
| 1250 DVLOG(2) << "Forcibly deleted port " << iter->first << "@" << name_; | |
| 1251 iter = ports_.erase(iter); | |
|
Anand Mistry (off Chromium)
2016/05/16 04:03:57
um... according to http://en.cppreference.com/w/cp
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Yeah, that makes me uncomfortable. I changed it to
| |
| 1252 } else { | |
| 1253 ++iter; | |
| 1254 } | |
| 1255 } | |
| 1256 } | |
| 1257 | |
| 1258 // Wake up any receiving ports who have just observed simulated peer closure. | |
| 1259 for (size_t i = 0; i < ports_to_notify.size(); ++i) | |
|
Anand Mistry (off Chromium)
2016/05/16 04:03:57
for (const auto& port : ports_to_notify)
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
| |
| 1260 delegate_->PortStatusChanged(ports_to_notify[i]); | |
| 1261 | |
| 1262 for (const auto& proxy_name : dead_proxies_to_broadcast) { | |
| 1263 // Broadcast an event signifying that this proxy is no longer functioning. | |
| 1264 ObserveProxyEventData event; | |
| 1265 event.proxy_node_name = name_; | |
| 1266 event.proxy_port_name = proxy_name; | |
| 1267 event.proxy_to_node_name = kInvalidNodeName; | |
| 1268 event.proxy_to_port_name = kInvalidPortName; | |
| 1269 delegate_->BroadcastMessage(NewInternalMessage( | |
| 1270 kInvalidPortName, EventType::kObserveProxy, event)); | |
| 1271 } | |
| 1272 } | |
| 1273 | |
| 1229 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, | 1274 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, |
| 1230 const EventType& type, | 1275 const EventType& type, |
| 1231 const void* data, | 1276 const void* data, |
| 1232 size_t num_data_bytes) { | 1277 size_t num_data_bytes) { |
| 1233 ScopedMessage message; | 1278 ScopedMessage message; |
| 1234 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); | 1279 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); |
| 1235 | 1280 |
| 1236 EventHeader* header = GetMutableEventHeader(message.get()); | 1281 EventHeader* header = GetMutableEventHeader(message.get()); |
| 1237 header->port_name = port_name; | 1282 header->port_name = port_name; |
| 1238 header->type = type; | 1283 header->type = type; |
| 1239 header->padding = 0; | 1284 header->padding = 0; |
| 1240 | 1285 |
| 1241 if (num_data_bytes) | 1286 if (num_data_bytes) |
| 1242 memcpy(header + 1, data, num_data_bytes); | 1287 memcpy(header + 1, data, num_data_bytes); |
| 1243 | 1288 |
| 1244 return message; | 1289 return message; |
| 1245 } | 1290 } |
| 1246 | 1291 |
| 1247 } // namespace ports | 1292 } // namespace ports |
| 1248 } // namespace edk | 1293 } // namespace edk |
| 1249 } // namespace mojo | 1294 } // namespace mojo |
| OLD | NEW |