OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/edk/system/ports/node.h" |
| 6 |
| 7 #include <string.h> |
| 8 |
| 9 #include "base/logging.h" |
| 10 #include "base/memory/ref_counted.h" |
| 11 #include "base/synchronization/lock.h" |
| 12 #include "mojo/edk/system/ports/node_delegate.h" |
| 13 |
| 14 namespace mojo { |
| 15 namespace edk { |
| 16 namespace ports { |
| 17 |
| 18 namespace { |
| 19 |
| 20 int DebugError(const char* message, int error_code) { |
| 21 CHECK(false) << "Oops: " << message; |
| 22 return error_code; |
| 23 } |
| 24 |
| 25 #define OOPS(x) DebugError(#x, x) |
| 26 |
| 27 bool CanAcceptMoreMessages(const Port* port) { |
| 28 // Have we already doled out the last message (i.e., do we expect to NOT |
| 29 // receive further messages)? |
| 30 uint64_t next_sequence_num = port->message_queue.next_sequence_num(); |
| 31 if (port->peer_closed || port->remove_proxy_on_last_message) { |
| 32 if (port->last_sequence_num_to_receive == next_sequence_num - 1) |
| 33 return false; |
| 34 } |
| 35 return true; |
| 36 } |
| 37 |
| 38 } // namespace |
| 39 |
| 40 Node::Node(const NodeName& name, NodeDelegate* delegate) |
| 41 : name_(name), |
| 42 delegate_(delegate) { |
| 43 } |
| 44 |
| 45 Node::~Node() { |
| 46 if (!ports_.empty()) |
| 47 DLOG(WARNING) << "Unclean shutdown for node " << name_; |
| 48 } |
| 49 |
| 50 bool Node::CanShutdownCleanly(bool allow_local_ports) { |
| 51 base::AutoLock ports_lock(ports_lock_); |
| 52 |
| 53 if (!allow_local_ports) { |
| 54 #if !defined(NDEBUG) |
| 55 for (auto entry : ports_) { |
| 56 DVLOG(2) << "Port " << entry.first << " referencing node " |
| 57 << entry.second->peer_node_name << " is blocking shutdown of " |
| 58 << "node " << name_ << " (state=" << entry.second->state << ")"; |
| 59 } |
| 60 #endif |
| 61 return ports_.empty(); |
| 62 } |
| 63 |
| 64 // NOTE: This is not efficient, though it probably doesn't need to be since |
| 65 // relatively few ports should be open during shutdown and shutdown doesn't |
| 66 // need to be blazingly fast. |
| 67 bool can_shutdown = true; |
| 68 for (auto entry : ports_) { |
| 69 base::AutoLock lock(entry.second->lock); |
| 70 if (entry.second->peer_node_name != name_ && |
| 71 entry.second->state != Port::kReceiving) { |
| 72 can_shutdown = false; |
| 73 #if !defined(NDEBUG) |
| 74 DVLOG(2) << "Port " << entry.first << " referencing node " |
| 75 << entry.second->peer_node_name << " is blocking shutdown of " |
| 76 << "node " << name_ << " (state=" << entry.second->state << ")"; |
| 77 #else |
| 78 // Exit early when not debugging. |
| 79 break; |
| 80 #endif |
| 81 } |
| 82 } |
| 83 |
| 84 return can_shutdown; |
| 85 } |
| 86 |
| 87 int Node::GetPort(const PortName& port_name, PortRef* port_ref) { |
| 88 scoped_refptr<Port> port = GetPort(port_name); |
| 89 if (!port) |
| 90 return ERROR_PORT_UNKNOWN; |
| 91 |
| 92 *port_ref = PortRef(port_name, std::move(port)); |
| 93 return OK; |
| 94 } |
| 95 |
| 96 int Node::CreateUninitializedPort(PortRef* port_ref) { |
| 97 PortName port_name; |
| 98 delegate_->GenerateRandomPortName(&port_name); |
| 99 |
| 100 scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum, |
| 101 kInitialSequenceNum)); |
| 102 int rv = AddPortWithName(port_name, port); |
| 103 if (rv != OK) |
| 104 return rv; |
| 105 |
| 106 *port_ref = PortRef(port_name, std::move(port)); |
| 107 return OK; |
| 108 } |
| 109 |
| 110 int Node::InitializePort(const PortRef& port_ref, |
| 111 const NodeName& peer_node_name, |
| 112 const PortName& peer_port_name) { |
| 113 Port* port = port_ref.port(); |
| 114 |
| 115 { |
| 116 base::AutoLock lock(port->lock); |
| 117 if (port->state != Port::kUninitialized) |
| 118 return ERROR_PORT_STATE_UNEXPECTED; |
| 119 |
| 120 port->state = Port::kReceiving; |
| 121 port->peer_node_name = peer_node_name; |
| 122 port->peer_port_name = peer_port_name; |
| 123 } |
| 124 |
| 125 delegate_->PortStatusChanged(port_ref); |
| 126 |
| 127 return OK; |
| 128 } |
| 129 |
| 130 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) { |
| 131 int rv; |
| 132 |
| 133 rv = CreateUninitializedPort(port0_ref); |
| 134 if (rv != OK) |
| 135 return rv; |
| 136 |
| 137 rv = CreateUninitializedPort(port1_ref); |
| 138 if (rv != OK) |
| 139 return rv; |
| 140 |
| 141 rv = InitializePort(*port0_ref, name_, port1_ref->name()); |
| 142 if (rv != OK) |
| 143 return rv; |
| 144 |
| 145 rv = InitializePort(*port1_ref, name_, port0_ref->name()); |
| 146 if (rv != OK) |
| 147 return rv; |
| 148 |
| 149 return OK; |
| 150 } |
| 151 |
| 152 int Node::SetUserData(const PortRef& port_ref, |
| 153 const scoped_refptr<UserData>& user_data) { |
| 154 Port* port = port_ref.port(); |
| 155 |
| 156 base::AutoLock lock(port->lock); |
| 157 if (port->state == Port::kClosed) |
| 158 return ERROR_PORT_STATE_UNEXPECTED; |
| 159 |
| 160 port->user_data = std::move(user_data); |
| 161 |
| 162 return OK; |
| 163 } |
| 164 |
| 165 int Node::GetUserData(const PortRef& port_ref, |
| 166 scoped_refptr<UserData>* user_data) { |
| 167 Port* port = port_ref.port(); |
| 168 |
| 169 base::AutoLock lock(port->lock); |
| 170 if (port->state == Port::kClosed) |
| 171 return ERROR_PORT_STATE_UNEXPECTED; |
| 172 |
| 173 *user_data = port->user_data; |
| 174 |
| 175 return OK; |
| 176 } |
| 177 |
| 178 int Node::ClosePort(const PortRef& port_ref) { |
| 179 std::deque<PortName> referenced_port_names; |
| 180 |
| 181 ObserveClosureEventData data; |
| 182 |
| 183 NodeName peer_node_name; |
| 184 PortName peer_port_name; |
| 185 Port* port = port_ref.port(); |
| 186 { |
| 187 // We may need to erase the port, which requires ports_lock_ to be held, |
| 188 // but ports_lock_ must be acquired before any individual port locks. |
| 189 base::AutoLock ports_lock(ports_lock_); |
| 190 |
| 191 base::AutoLock lock(port->lock); |
| 192 if (port->state == Port::kUninitialized) { |
| 193 // If the port was not yet initialized, there's nothing interesting to do. |
| 194 ErasePort_Locked(port_ref.name()); |
| 195 return OK; |
| 196 } |
| 197 |
| 198 if (port->state != Port::kReceiving) |
| 199 return ERROR_PORT_STATE_UNEXPECTED; |
| 200 |
| 201 port->state = Port::kClosed; |
| 202 |
| 203 // We pass along the sequence number of the last message sent from this |
| 204 // port to allow the peer to have the opportunity to consume all inbound |
| 205 // messages before notifying the embedder that this port is closed. |
| 206 data.last_sequence_num = port->next_sequence_num_to_send - 1; |
| 207 |
| 208 peer_node_name = port->peer_node_name; |
| 209 peer_port_name = port->peer_port_name; |
| 210 |
| 211 // If the port being closed still has unread messages, then we need to take |
| 212 // care to close those ports so as to avoid leaking memory. |
| 213 port->message_queue.GetReferencedPorts(&referenced_port_names); |
| 214 } |
| 215 |
| 216 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_ |
| 217 << " to " << peer_port_name << "@" << peer_node_name; |
| 218 |
| 219 ErasePort(port_ref.name()); |
| 220 |
| 221 delegate_->ForwardMessage( |
| 222 peer_node_name, |
| 223 NewInternalMessage(peer_port_name, EventType::kObserveClosure, data)); |
| 224 |
| 225 for (const auto& name : referenced_port_names) { |
| 226 PortRef ref; |
| 227 if (GetPort(name, &ref) == OK) |
| 228 ClosePort(ref); |
| 229 } |
| 230 return OK; |
| 231 } |
| 232 |
| 233 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) { |
| 234 Port* port = port_ref.port(); |
| 235 |
| 236 base::AutoLock lock(port->lock); |
| 237 |
| 238 if (port->state != Port::kReceiving) |
| 239 return ERROR_PORT_STATE_UNEXPECTED; |
| 240 |
| 241 port_status->has_messages = port->message_queue.HasNextMessage(); |
| 242 port_status->receiving_messages = CanAcceptMoreMessages(port); |
| 243 port_status->peer_closed = port->peer_closed; |
| 244 return OK; |
| 245 } |
| 246 |
| 247 int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) { |
| 248 return GetMessageIf(port_ref, nullptr, message); |
| 249 } |
| 250 |
| 251 int Node::GetMessageIf(const PortRef& port_ref, |
| 252 std::function<bool(const Message&)> selector, |
| 253 ScopedMessage* message) { |
| 254 *message = nullptr; |
| 255 |
| 256 DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_; |
| 257 |
| 258 Port* port = port_ref.port(); |
| 259 { |
| 260 base::AutoLock lock(port->lock); |
| 261 |
| 262 // This could also be treated like the port being unknown since the |
| 263 // embedder should no longer be referring to a port that has been sent. |
| 264 if (port->state != Port::kReceiving) |
| 265 return ERROR_PORT_STATE_UNEXPECTED; |
| 266 |
| 267 // Let the embedder get messages until there are no more before reporting |
| 268 // that the peer closed its end. |
| 269 if (!CanAcceptMoreMessages(port)) |
| 270 return ERROR_PORT_PEER_CLOSED; |
| 271 |
| 272 port->message_queue.GetNextMessageIf(selector, message); |
| 273 } |
| 274 |
| 275 // Allow referenced ports to trigger PortStatusChanged calls. |
| 276 if (*message) { |
| 277 for (size_t i = 0; i < (*message)->num_ports(); ++i) { |
| 278 const PortName& new_port_name = (*message)->ports()[i]; |
| 279 scoped_refptr<Port> new_port = GetPort(new_port_name); |
| 280 |
| 281 DCHECK(new_port) << "Port " << new_port_name << "@" << name_ |
| 282 << " does not exist!"; |
| 283 |
| 284 base::AutoLock lock(new_port->lock); |
| 285 |
| 286 DCHECK(new_port->state == Port::kReceiving); |
| 287 new_port->message_queue.set_signalable(true); |
| 288 } |
| 289 } |
| 290 |
| 291 return OK; |
| 292 } |
| 293 |
| 294 int Node::SendMessage(const PortRef& port_ref, ScopedMessage* message) { |
| 295 ScopedMessage& m = *message; |
| 296 for (size_t i = 0; i < m->num_ports(); ++i) { |
| 297 if (m->ports()[i] == port_ref.name()) |
| 298 return ERROR_PORT_CANNOT_SEND_SELF; |
| 299 } |
| 300 |
| 301 Port* port = port_ref.port(); |
| 302 { |
| 303 // We must acquire |ports_lock_| before grabbing any port locks, because |
| 304 // WillSendMessage_Locked may need to lock multiple ports out of order. |
| 305 base::AutoLock ports_lock(ports_lock_); |
| 306 base::AutoLock lock(port->lock); |
| 307 |
| 308 if (port->state != Port::kReceiving) |
| 309 return ERROR_PORT_STATE_UNEXPECTED; |
| 310 |
| 311 if (port->peer_closed) |
| 312 return ERROR_PORT_PEER_CLOSED; |
| 313 |
| 314 int rv = WillSendMessage_Locked(port, port_ref.name(), m.get()); |
| 315 if (rv != OK) |
| 316 return rv; |
| 317 |
| 318 // Beyond this point there's no sense in returning anything but OK. Even if |
| 319 // message forwarding or acceptance fails, there's nothing the embedder can |
| 320 // do to recover. Assume that failure beyond this point must be treated as a |
| 321 // transport failure. |
| 322 |
| 323 if (port->peer_node_name != name_) { |
| 324 delegate_->ForwardMessage(port->peer_node_name, std::move(m)); |
| 325 return OK; |
| 326 } |
| 327 } |
| 328 |
| 329 int rv = AcceptMessage(std::move(m)); |
| 330 if (rv != OK) { |
| 331 // See comment above for why we don't return an error in this case. |
| 332 DVLOG(2) << "AcceptMessage failed: " << rv; |
| 333 } |
| 334 |
| 335 return OK; |
| 336 } |
| 337 |
| 338 int Node::AcceptMessage(ScopedMessage message) { |
| 339 const EventHeader* header = GetEventHeader(*message); |
| 340 switch (header->type) { |
| 341 case EventType::kUser: |
| 342 return OnUserMessage(std::move(message)); |
| 343 case EventType::kPortAccepted: |
| 344 return OnPortAccepted(header->port_name); |
| 345 case EventType::kObserveProxy: |
| 346 return OnObserveProxy( |
| 347 header->port_name, |
| 348 *GetEventData<ObserveProxyEventData>(*message)); |
| 349 case EventType::kObserveProxyAck: |
| 350 return OnObserveProxyAck( |
| 351 header->port_name, |
| 352 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); |
| 353 case EventType::kObserveClosure: |
| 354 return OnObserveClosure( |
| 355 header->port_name, |
| 356 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
| 357 } |
| 358 return OOPS(ERROR_NOT_IMPLEMENTED); |
| 359 } |
| 360 |
| 361 int Node::LostConnectionToNode(const NodeName& node_name) { |
| 362 // We can no longer send events to the given node. We also can't expect any |
| 363 // PortAccepted events. |
| 364 |
| 365 DVLOG(1) << "Observing lost connection from node " << name_ |
| 366 << " to node " << node_name; |
| 367 |
| 368 std::vector<PortRef> ports_to_notify; |
| 369 |
| 370 { |
| 371 base::AutoLock ports_lock(ports_lock_); |
| 372 |
| 373 for (auto iter = ports_.begin(); iter != ports_.end(); ) { |
| 374 scoped_refptr<Port>& port = iter->second; |
| 375 |
| 376 bool remove_port = false; |
| 377 { |
| 378 base::AutoLock port_lock(port->lock); |
| 379 |
| 380 if (port->peer_node_name == node_name) { |
| 381 // We can no longer send messages to this port's peer. We assume we |
| 382 // will not receive any more messages from this port's peer as well. |
| 383 if (!port->peer_closed) { |
| 384 port->peer_closed = true; |
| 385 port->last_sequence_num_to_receive = |
| 386 port->message_queue.next_sequence_num() - 1; |
| 387 |
| 388 if (port->state == Port::kReceiving) |
| 389 ports_to_notify.push_back(PortRef(iter->first, port)); |
| 390 } |
| 391 |
| 392 // We do not expect to forward any further messages, and we do not |
| 393 // expect to receive a Port{Accepted,Rejected} event. |
| 394 if (port->state != Port::kReceiving) |
| 395 remove_port = true; |
| 396 } |
| 397 } |
| 398 |
| 399 if (remove_port) { |
| 400 DVLOG(2) << "Deleted port " << iter->first << "@" << name_; |
| 401 iter = ports_.erase(iter); |
| 402 } else { |
| 403 ++iter; |
| 404 } |
| 405 } |
| 406 } |
| 407 |
| 408 for (size_t i = 0; i < ports_to_notify.size(); ++i) |
| 409 delegate_->PortStatusChanged(ports_to_notify[i]); |
| 410 |
| 411 return OK; |
| 412 } |
| 413 |
| 414 int Node::OnUserMessage(ScopedMessage message) { |
| 415 PortName port_name = GetEventHeader(*message)->port_name; |
| 416 const auto* event = GetEventData<UserEventData>(*message); |
| 417 |
| 418 #if !defined(NDEBUG) |
| 419 std::ostringstream ports_buf; |
| 420 for (size_t i = 0; i < message->num_ports(); ++i) { |
| 421 if (i > 0) |
| 422 ports_buf << ","; |
| 423 ports_buf << message->ports()[i]; |
| 424 } |
| 425 |
| 426 DVLOG(2) << "AcceptMessage " << event->sequence_num |
| 427 << " [ports=" << ports_buf.str() << "] at " |
| 428 << port_name << "@" << name_; |
| 429 #endif |
| 430 |
| 431 scoped_refptr<Port> port = GetPort(port_name); |
| 432 |
| 433 // Even if this port does not exist, cannot receive anymore messages or is |
| 434 // buffering or proxying messages, we still need these ports to be bound to |
| 435 // this node. When the message is forwarded, these ports will get transferred |
| 436 // following the usual method. If the message cannot be accepted, then the |
| 437 // newly bound ports will simply be closed. |
| 438 |
| 439 for (size_t i = 0; i < message->num_ports(); ++i) { |
| 440 int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]); |
| 441 if (rv != OK) |
| 442 return rv; |
| 443 } |
| 444 |
| 445 bool has_next_message = false; |
| 446 bool message_accepted = false; |
| 447 |
| 448 if (port) { |
| 449 // We may want to forward messages once the port lock is held, so we must |
| 450 // acquire |ports_lock_| first. |
| 451 base::AutoLock ports_lock(ports_lock_); |
| 452 base::AutoLock lock(port->lock); |
| 453 |
| 454 // Reject spurious messages if we've already received the last expected |
| 455 // message. |
| 456 if (CanAcceptMoreMessages(port.get())) { |
| 457 message_accepted = true; |
| 458 port->message_queue.AcceptMessage(std::move(message), &has_next_message); |
| 459 |
| 460 if (port->state == Port::kBuffering) { |
| 461 has_next_message = false; |
| 462 } else if (port->state == Port::kProxying) { |
| 463 has_next_message = false; |
| 464 |
| 465 // Forward messages. We forward messages in sequential order here so |
| 466 // that we maintain the message queue's notion of next sequence number. |
| 467 // That's useful for the proxy removal process as we can tell when this |
| 468 // port has seen all of the messages it is expected to see. |
| 469 int rv = ForwardMessages_Locked(port.get(), port_name); |
| 470 if (rv != OK) |
| 471 return rv; |
| 472 |
| 473 MaybeRemoveProxy_Locked(port.get(), port_name); |
| 474 } |
| 475 } |
| 476 } |
| 477 |
| 478 if (!message_accepted) { |
| 479 DVLOG(2) << "Message not accepted!\n"; |
| 480 // Close all newly accepted ports as they are effectively orphaned. |
| 481 for (size_t i = 0; i < message->num_ports(); ++i) { |
| 482 PortRef port_ref; |
| 483 if (GetPort(message->ports()[i], &port_ref) == OK) { |
| 484 ClosePort(port_ref); |
| 485 } else { |
| 486 DLOG(WARNING) << "Cannot close non-existent port!\n"; |
| 487 } |
| 488 } |
| 489 } else if (has_next_message) { |
| 490 PortRef port_ref(port_name, port); |
| 491 delegate_->PortStatusChanged(port_ref); |
| 492 } |
| 493 |
| 494 return OK; |
| 495 } |
| 496 |
| 497 int Node::OnPortAccepted(const PortName& port_name) { |
| 498 scoped_refptr<Port> port = GetPort(port_name); |
| 499 if (!port) |
| 500 return OOPS(ERROR_PORT_UNKNOWN); |
| 501 |
| 502 { |
| 503 // We must hold |ports_lock_| before grabbing the port lock because |
| 504 // ForwardMessages_Locked requires it to be held. |
| 505 base::AutoLock ports_lock(ports_lock_); |
| 506 base::AutoLock lock(port->lock); |
| 507 |
| 508 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ |
| 509 << " pointing to " |
| 510 << port->peer_port_name << "@" << port->peer_node_name; |
| 511 |
| 512 if (port->state != Port::kBuffering) |
| 513 return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
| 514 |
| 515 port->state = Port::kProxying; |
| 516 |
| 517 int rv = ForwardMessages_Locked(port.get(), port_name); |
| 518 if (rv != OK) |
| 519 return rv; |
| 520 |
| 521 // We may have observed closure before receiving PortAccepted. In that |
| 522 // case, we can advance to removing the proxy without sending out an |
| 523 // ObserveProxy message. We already know the last expected message, etc. |
| 524 |
| 525 if (port->remove_proxy_on_last_message) { |
| 526 MaybeRemoveProxy_Locked(port.get(), port_name); |
| 527 |
| 528 // Make sure we propagate closure to our current peer. |
| 529 ObserveClosureEventData data; |
| 530 data.last_sequence_num = port->last_sequence_num_to_receive; |
| 531 delegate_->ForwardMessage( |
| 532 port->peer_node_name, |
| 533 NewInternalMessage(port->peer_port_name, |
| 534 EventType::kObserveClosure, data)); |
| 535 } else { |
| 536 InitiateProxyRemoval_Locked(port.get(), port_name); |
| 537 } |
| 538 } |
| 539 return OK; |
| 540 } |
| 541 |
| 542 int Node::OnObserveProxy(const PortName& port_name, |
| 543 const ObserveProxyEventData& event) { |
| 544 // The port may have already been closed locally, in which case the |
| 545 // ObserveClosure message will contain the last_sequence_num field. |
| 546 // We can then silently ignore this message. |
| 547 scoped_refptr<Port> port = GetPort(port_name); |
| 548 if (!port) { |
| 549 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; |
| 550 return OK; |
| 551 } |
| 552 |
| 553 DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at " |
| 554 << event.proxy_port_name << "@" |
| 555 << event.proxy_node_name << " pointing to " |
| 556 << event.proxy_to_port_name << "@" |
| 557 << event.proxy_to_node_name; |
| 558 |
| 559 { |
| 560 base::AutoLock lock(port->lock); |
| 561 |
| 562 if (port->peer_node_name == event.proxy_node_name && |
| 563 port->peer_port_name == event.proxy_port_name) { |
| 564 if (port->state == Port::kReceiving) { |
| 565 port->peer_node_name = event.proxy_to_node_name; |
| 566 port->peer_port_name = event.proxy_to_port_name; |
| 567 |
| 568 ObserveProxyAckEventData ack; |
| 569 ack.last_sequence_num = port->next_sequence_num_to_send - 1; |
| 570 |
| 571 delegate_->ForwardMessage( |
| 572 event.proxy_node_name, |
| 573 NewInternalMessage(event.proxy_port_name, |
| 574 EventType::kObserveProxyAck, |
| 575 ack)); |
| 576 } else { |
| 577 // As a proxy ourselves, we don't know how to honor the ObserveProxy |
| 578 // event or to populate the last_sequence_num field of ObserveProxyAck. |
| 579 // Afterall, another port could be sending messages to our peer now |
| 580 // that we've sent out our own ObserveProxy event. Instead, we will |
| 581 // send an ObserveProxyAck indicating that the ObserveProxy event |
| 582 // should be re-sent (last_sequence_num set to kInvalidSequenceNum). |
| 583 // However, this has to be done after we are removed as a proxy. |
| 584 // Otherwise, we might just find ourselves back here again, which |
| 585 // would be akin to a busy loop. |
| 586 |
| 587 DVLOG(2) << "Delaying ObserveProxyAck to " |
| 588 << event.proxy_port_name << "@" << event.proxy_node_name; |
| 589 |
| 590 ObserveProxyAckEventData ack; |
| 591 ack.last_sequence_num = kInvalidSequenceNum; |
| 592 |
| 593 port->send_on_proxy_removal.reset( |
| 594 new std::pair<NodeName, ScopedMessage>( |
| 595 event.proxy_node_name, |
| 596 NewInternalMessage(event.proxy_port_name, |
| 597 EventType::kObserveProxyAck, |
| 598 ack))); |
| 599 } |
| 600 } else { |
| 601 // Forward this event along to our peer. Eventually, it should find the |
| 602 // port referring to the proxy. |
| 603 delegate_->ForwardMessage( |
| 604 port->peer_node_name, |
| 605 NewInternalMessage(port->peer_port_name, |
| 606 EventType::kObserveProxy, |
| 607 event)); |
| 608 } |
| 609 } |
| 610 return OK; |
| 611 } |
| 612 |
| 613 int Node::OnObserveProxyAck(const PortName& port_name, |
| 614 uint64_t last_sequence_num) { |
| 615 DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_ |
| 616 << " (last_sequence_num=" << last_sequence_num << ")"; |
| 617 |
| 618 scoped_refptr<Port> port = GetPort(port_name); |
| 619 if (!port) |
| 620 return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so |
| 621 // this is not an "Oops". |
| 622 |
| 623 { |
| 624 // We must acquire |ports_lock_| before the port lock because it must be |
| 625 // held for MaybeRemoveProxy_Locked. |
| 626 base::AutoLock ports_lock(ports_lock_); |
| 627 |
| 628 base::AutoLock lock(port->lock); |
| 629 |
| 630 if (port->state != Port::kProxying) |
| 631 return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
| 632 |
| 633 if (last_sequence_num == kInvalidSequenceNum) { |
| 634 // Send again. |
| 635 InitiateProxyRemoval_Locked(port.get(), port_name); |
| 636 return OK; |
| 637 } |
| 638 |
| 639 // We can now remove this port once we have received and forwarded the last |
| 640 // message addressed to this port. |
| 641 port->remove_proxy_on_last_message = true; |
| 642 port->last_sequence_num_to_receive = last_sequence_num; |
| 643 |
| 644 MaybeRemoveProxy_Locked(port.get(), port_name); |
| 645 } |
| 646 return OK; |
| 647 } |
| 648 |
| 649 int Node::OnObserveClosure(const PortName& port_name, |
| 650 uint64_t last_sequence_num) { |
| 651 // OK if the port doesn't exist, as it may have been closed already. |
| 652 scoped_refptr<Port> port = GetPort(port_name); |
| 653 if (!port) |
| 654 return OK; |
| 655 |
| 656 // This message tells the port that it should no longer expect more messages |
| 657 // beyond last_sequence_num. This message is forwarded along until we reach |
| 658 // the receiving end, and this message serves as an equivalent to |
| 659 // ObserveProxyAck. |
| 660 |
| 661 bool notify_delegate = false; |
| 662 { |
| 663 // We must acquire |ports_lock_| before the port lock because it must be |
| 664 // held for MaybeRemoveProxy_Locked. |
| 665 base::AutoLock ports_lock(ports_lock_); |
| 666 |
| 667 base::AutoLock lock(port->lock); |
| 668 |
| 669 port->peer_closed = true; |
| 670 port->last_sequence_num_to_receive = last_sequence_num; |
| 671 |
| 672 DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_ |
| 673 << " (state=" << port->state << ") pointing to " |
| 674 << port->peer_port_name << "@" << port->peer_node_name |
| 675 << " (last_sequence_num=" << last_sequence_num << ")"; |
| 676 |
| 677 // We always forward ObserveClosure, even beyond the receiving port which |
| 678 // cares about it. This ensures that any dead-end proxies beyond that port |
| 679 // are notified to remove themselves. |
| 680 |
| 681 ObserveClosureEventData forwarded_data; |
| 682 |
| 683 if (port->state == Port::kReceiving) { |
| 684 notify_delegate = true; |
| 685 |
| 686 // When forwarding along the other half of the port cycle, this will only |
| 687 // reach dead-end proxies. Tell them we've sent our last message so they |
| 688 // can go away. |
| 689 // |
| 690 // TODO: Repurposing ObserveClosure for this has the desired result but |
| 691 // may be semantically confusing since the forwarding port is not actually |
| 692 // closed. Consider replacing this with a new event type. |
| 693 forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1; |
| 694 } else { |
| 695 // We haven't yet reached the receiving peer of the closed port, so |
| 696 // forward the message along as-is. |
| 697 forwarded_data.last_sequence_num = last_sequence_num; |
| 698 |
| 699 // See about removing the port if it is a proxy as our peer won't be able |
| 700 // to participate in proxy removal. |
| 701 port->remove_proxy_on_last_message = true; |
| 702 if (port->state == Port::kProxying) |
| 703 MaybeRemoveProxy_Locked(port.get(), port_name); |
| 704 } |
| 705 |
| 706 DVLOG(2) << "Forwarding ObserveClosure from " |
| 707 << port_name << "@" << name_ << " to peer " |
| 708 << port->peer_port_name << "@" << port->peer_node_name |
| 709 << " (last_sequence_num=" << forwarded_data.last_sequence_num |
| 710 << ")"; |
| 711 |
| 712 delegate_->ForwardMessage( |
| 713 port->peer_node_name, |
| 714 NewInternalMessage(port->peer_port_name, |
| 715 EventType::kObserveClosure, forwarded_data)); |
| 716 } |
| 717 if (notify_delegate) { |
| 718 PortRef port_ref(port_name, port); |
| 719 delegate_->PortStatusChanged(port_ref); |
| 720 } |
| 721 return OK; |
| 722 } |
| 723 |
| 724 int Node::AddPortWithName(const PortName& port_name, |
| 725 const scoped_refptr<Port>& port) { |
| 726 base::AutoLock lock(ports_lock_); |
| 727 |
| 728 if (!ports_.insert(std::make_pair(port_name, port)).second) |
| 729 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. |
| 730 |
| 731 DVLOG(2) << "Created port " << port_name << "@" << name_; |
| 732 return OK; |
| 733 } |
| 734 |
| 735 void Node::ErasePort(const PortName& port_name) { |
| 736 base::AutoLock lock(ports_lock_); |
| 737 return ErasePort_Locked(port_name); |
| 738 } |
| 739 |
| 740 void Node::ErasePort_Locked(const PortName& port_name) { |
| 741 ports_lock_.AssertAcquired(); |
| 742 ports_.erase(port_name); |
| 743 DVLOG(2) << "Deleted port " << port_name << "@" << name_; |
| 744 } |
| 745 |
| 746 scoped_refptr<Port> Node::GetPort(const PortName& port_name) { |
| 747 base::AutoLock lock(ports_lock_); |
| 748 return GetPort_Locked(port_name); |
| 749 } |
| 750 |
| 751 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) { |
| 752 ports_lock_.AssertAcquired(); |
| 753 auto iter = ports_.find(port_name); |
| 754 if (iter == ports_.end()) |
| 755 return nullptr; |
| 756 |
| 757 return iter->second; |
| 758 } |
| 759 |
| 760 void Node::WillSendPort_Locked(Port* port, |
| 761 const NodeName& to_node_name, |
| 762 PortName* port_name, |
| 763 PortDescriptor* port_descriptor) { |
| 764 ports_lock_.AssertAcquired(); |
| 765 port->lock.AssertAcquired(); |
| 766 |
| 767 PortName local_port_name = *port_name; |
| 768 |
| 769 PortName new_port_name; |
| 770 delegate_->GenerateRandomPortName(&new_port_name); |
| 771 |
| 772 // Make sure we don't send messages to the new peer until after we know it |
| 773 // exists. In the meantime, just buffer messages locally. |
| 774 DCHECK(port->state == Port::kReceiving); |
| 775 port->state = Port::kBuffering; |
| 776 |
| 777 // If we already know our peer is closed, we already know this proxy can |
| 778 // be removed once it receives and forwards its last expected message. |
| 779 if (port->peer_closed) |
| 780 port->remove_proxy_on_last_message = true; |
| 781 |
| 782 *port_name = new_port_name; |
| 783 |
| 784 port_descriptor->peer_node_name = port->peer_node_name; |
| 785 port_descriptor->peer_port_name = port->peer_port_name; |
| 786 port_descriptor->referring_node_name = name_; |
| 787 port_descriptor->referring_port_name = local_port_name; |
| 788 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send; |
| 789 port_descriptor->next_sequence_num_to_receive = |
| 790 port->message_queue.next_sequence_num(); |
| 791 port_descriptor->last_sequence_num_to_receive = |
| 792 port->last_sequence_num_to_receive; |
| 793 port_descriptor->peer_closed = port->peer_closed; |
| 794 |
| 795 // Configure the local port to point to the new port. |
| 796 port->peer_node_name = to_node_name; |
| 797 port->peer_port_name = new_port_name; |
| 798 } |
| 799 |
| 800 int Node::AcceptPort(const PortName& port_name, |
| 801 const PortDescriptor& port_descriptor) { |
| 802 scoped_refptr<Port> port = make_scoped_refptr( |
| 803 new Port(port_descriptor.next_sequence_num_to_send, |
| 804 port_descriptor.next_sequence_num_to_receive)); |
| 805 port->state = Port::kReceiving; |
| 806 port->peer_node_name = port_descriptor.peer_node_name; |
| 807 port->peer_port_name = port_descriptor.peer_port_name; |
| 808 port->last_sequence_num_to_receive = |
| 809 port_descriptor.last_sequence_num_to_receive; |
| 810 port->peer_closed = port_descriptor.peer_closed; |
| 811 |
| 812 DVLOG(2) << "Accepting port " << port_name << " [peer_closed=" |
| 813 << port->peer_closed << "; last_sequence_num_to_receive=" |
| 814 << port->last_sequence_num_to_receive << "]"; |
| 815 |
| 816 // A newly accepted port is not signalable until the message referencing the |
| 817 // new port finds its way to the consumer (see GetMessageIf). |
| 818 port->message_queue.set_signalable(false); |
| 819 |
| 820 int rv = AddPortWithName(port_name, port); |
| 821 if (rv != OK) |
| 822 return rv; |
| 823 |
| 824 // Allow referring port to forward messages. |
| 825 delegate_->ForwardMessage( |
| 826 port_descriptor.referring_node_name, |
| 827 NewInternalMessage(port_descriptor.referring_port_name, |
| 828 EventType::kPortAccepted)); |
| 829 return OK; |
| 830 } |
| 831 |
| 832 int Node::WillSendMessage_Locked(Port* port, |
| 833 const PortName& port_name, |
| 834 Message* message) { |
| 835 ports_lock_.AssertAcquired(); |
| 836 port->lock.AssertAcquired(); |
| 837 |
| 838 DCHECK(message); |
| 839 |
| 840 // Messages may already have a sequence number if they're being forwarded |
| 841 // by a proxy. Otherwise, use the next outgoing sequence number. |
| 842 uint64_t* sequence_num = |
| 843 &GetMutableEventData<UserEventData>(message)->sequence_num; |
| 844 if (*sequence_num == 0) |
| 845 *sequence_num = port->next_sequence_num_to_send++; |
| 846 |
| 847 #if !defined(NDEBUG) |
| 848 std::ostringstream ports_buf; |
| 849 for (size_t i = 0; i < message->num_ports(); ++i) { |
| 850 if (i > 0) |
| 851 ports_buf << ","; |
| 852 ports_buf << message->ports()[i]; |
| 853 } |
| 854 #endif |
| 855 |
| 856 if (message->num_ports() > 0) { |
| 857 // Note: Another thread could be trying to send the same ports, so we need |
| 858 // to ensure that they are ours to send before we mutate their state. |
| 859 |
| 860 std::vector<scoped_refptr<Port>> ports; |
| 861 ports.resize(message->num_ports()); |
| 862 |
| 863 { |
| 864 for (size_t i = 0; i < message->num_ports(); ++i) { |
| 865 ports[i] = GetPort_Locked(message->ports()[i]); |
| 866 ports[i]->lock.Acquire(); |
| 867 |
| 868 int error = OK; |
| 869 if (ports[i]->state != Port::kReceiving) |
| 870 error = ERROR_PORT_STATE_UNEXPECTED; |
| 871 else if (message->ports()[i] == port->peer_port_name) |
| 872 error = ERROR_PORT_CANNOT_SEND_PEER; |
| 873 |
| 874 if (error != OK) { |
| 875 // Oops, we cannot send this port. |
| 876 for (size_t j = 0; j <= i; ++j) |
| 877 ports[i]->lock.Release(); |
| 878 // Backpedal on the sequence number. |
| 879 port->next_sequence_num_to_send--; |
| 880 return error; |
| 881 } |
| 882 } |
| 883 } |
| 884 |
| 885 PortDescriptor* port_descriptors = |
| 886 GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message)); |
| 887 |
| 888 for (size_t i = 0; i < message->num_ports(); ++i) { |
| 889 WillSendPort_Locked(ports[i].get(), |
| 890 port->peer_node_name, |
| 891 message->mutable_ports() + i, |
| 892 port_descriptors + i); |
| 893 } |
| 894 |
| 895 for (size_t i = 0; i < message->num_ports(); ++i) |
| 896 ports[i]->lock.Release(); |
| 897 } |
| 898 |
| 899 #if !defined(NDEBUG) |
| 900 DVLOG(2) << "Sending message " |
| 901 << GetEventData<UserEventData>(*message)->sequence_num |
| 902 << " [ports=" << ports_buf.str() << "]" |
| 903 << " from " << port_name << "@" << name_ |
| 904 << " to " << port->peer_port_name << "@" << port->peer_node_name; |
| 905 #endif |
| 906 |
| 907 GetMutableEventHeader(message)->port_name = port->peer_port_name; |
| 908 return OK; |
| 909 } |
| 910 |
| 911 int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
| 912 ports_lock_.AssertAcquired(); |
| 913 port->lock.AssertAcquired(); |
| 914 |
| 915 for (;;) { |
| 916 ScopedMessage message; |
| 917 port->message_queue.GetNextMessageIf(nullptr, &message); |
| 918 if (!message) |
| 919 break; |
| 920 |
| 921 int rv = WillSendMessage_Locked(port, port_name, message.get()); |
| 922 if (rv != OK) |
| 923 return rv; |
| 924 |
| 925 delegate_->ForwardMessage(port->peer_node_name, std::move(message)); |
| 926 } |
| 927 return OK; |
| 928 } |
| 929 |
| 930 void Node::InitiateProxyRemoval_Locked(Port* port, |
| 931 const PortName& port_name) { |
| 932 port->lock.AssertAcquired(); |
| 933 |
| 934 // To remove this node, we start by notifying the connected graph that we are |
| 935 // a proxy. This allows whatever port is referencing this node to skip it. |
| 936 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if |
| 937 // the peer was closed in the meantime). |
| 938 |
| 939 ObserveProxyEventData data; |
| 940 data.proxy_node_name = name_; |
| 941 data.proxy_port_name = port_name; |
| 942 data.proxy_to_node_name = port->peer_node_name; |
| 943 data.proxy_to_port_name = port->peer_port_name; |
| 944 |
| 945 delegate_->ForwardMessage( |
| 946 port->peer_node_name, |
| 947 NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data)); |
| 948 } |
| 949 |
| 950 void Node::MaybeRemoveProxy_Locked(Port* port, |
| 951 const PortName& port_name) { |
| 952 // |ports_lock_| must be held so we can potentilaly ErasePort_Locked(). |
| 953 ports_lock_.AssertAcquired(); |
| 954 port->lock.AssertAcquired(); |
| 955 |
| 956 DCHECK(port->state == Port::kProxying); |
| 957 |
| 958 // Make sure we have seen ObserveProxyAck before removing the port. |
| 959 if (!port->remove_proxy_on_last_message) |
| 960 return; |
| 961 |
| 962 if (!CanAcceptMoreMessages(port)) { |
| 963 // This proxy port is done. We can now remove it! |
| 964 ErasePort_Locked(port_name); |
| 965 |
| 966 if (port->send_on_proxy_removal) { |
| 967 NodeName to_node = port->send_on_proxy_removal->first; |
| 968 ScopedMessage& message = port->send_on_proxy_removal->second; |
| 969 |
| 970 delegate_->ForwardMessage(to_node, std::move(message)); |
| 971 } |
| 972 } else { |
| 973 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_ |
| 974 << " now; waiting for more messages"; |
| 975 } |
| 976 } |
| 977 |
| 978 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, |
| 979 const EventType& type, |
| 980 const void* data, |
| 981 size_t num_data_bytes) { |
| 982 ScopedMessage message; |
| 983 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); |
| 984 |
| 985 EventHeader* header = GetMutableEventHeader(message.get()); |
| 986 header->port_name = port_name; |
| 987 header->type = type; |
| 988 header->padding = 0; |
| 989 |
| 990 if (num_data_bytes) |
| 991 memcpy(header + 1, data, num_data_bytes); |
| 992 |
| 993 return message; |
| 994 } |
| 995 |
| 996 } // namespace ports |
| 997 } // namespace edk |
| 998 } // namespace mojo |
OLD | NEW |