OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/edk/system/node_controller.h" |
| 6 |
| 7 #include <algorithm> |
| 8 |
| 9 #include "base/bind.h" |
| 10 #include "base/location.h" |
| 11 #include "base/logging.h" |
| 12 #include "base/macros.h" |
| 13 #include "base/message_loop/message_loop.h" |
| 14 #include "crypto/random.h" |
| 15 #include "mojo/edk/embedder/embedder_internal.h" |
| 16 #include "mojo/edk/embedder/platform_channel_pair.h" |
| 17 #include "mojo/edk/embedder/platform_support.h" |
| 18 #include "mojo/edk/system/core.h" |
| 19 #include "mojo/edk/system/ports_message.h" |
| 20 |
| 21 namespace mojo { |
| 22 namespace edk { |
| 23 |
| 24 namespace { |
| 25 |
| 26 template <typename T> |
| 27 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); } |
| 28 |
| 29 ports::NodeName GetRandomNodeName() { |
| 30 ports::NodeName name; |
| 31 GenerateRandomName(&name); |
| 32 return name; |
| 33 } |
| 34 |
| 35 // Used by NodeController to watch for shutdown. Since no IO can happen once |
| 36 // the IO thread is killed, the NodeController can cleanly drop all its peers |
| 37 // at that time. |
| 38 class ThreadDestructionObserver : |
| 39 public base::MessageLoop::DestructionObserver { |
| 40 public: |
| 41 static void Create(scoped_refptr<base::TaskRunner> task_runner, |
| 42 const base::Closure& callback) { |
| 43 if (task_runner->RunsTasksOnCurrentThread()) { |
| 44 // Owns itself. |
| 45 new ThreadDestructionObserver(callback); |
| 46 } else { |
| 47 task_runner->PostTask(FROM_HERE, |
| 48 base::Bind(&Create, task_runner, callback)); |
| 49 } |
| 50 } |
| 51 |
| 52 private: |
| 53 explicit ThreadDestructionObserver(const base::Closure& callback) |
| 54 : callback_(callback) { |
| 55 base::MessageLoop::current()->AddDestructionObserver(this); |
| 56 } |
| 57 |
| 58 ~ThreadDestructionObserver() override { |
| 59 base::MessageLoop::current()->RemoveDestructionObserver(this); |
| 60 } |
| 61 |
| 62 // base::MessageLoop::DestructionObserver: |
| 63 void WillDestroyCurrentMessageLoop() override { |
| 64 callback_.Run(); |
| 65 delete this; |
| 66 } |
| 67 |
| 68 const base::Closure callback_; |
| 69 |
| 70 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); |
| 71 }; |
| 72 |
| 73 } // namespace |
| 74 |
| 75 NodeController::PendingPortRequest::PendingPortRequest() {} |
| 76 |
| 77 NodeController::PendingPortRequest::~PendingPortRequest() {} |
| 78 |
| 79 NodeController::ReservedPort::ReservedPort() {} |
| 80 |
| 81 NodeController::ReservedPort::~ReservedPort() {} |
| 82 |
| 83 NodeController::~NodeController() {} |
| 84 |
| 85 NodeController::NodeController(Core* core) |
| 86 : core_(core), |
| 87 name_(GetRandomNodeName()), |
| 88 node_(new ports::Node(name_, this)) { |
| 89 DVLOG(1) << "Initializing node " << name_; |
| 90 } |
| 91 |
| 92 void NodeController::SetIOTaskRunner( |
| 93 scoped_refptr<base::TaskRunner> task_runner) { |
| 94 io_task_runner_ = task_runner; |
| 95 ThreadDestructionObserver::Create( |
| 96 io_task_runner_, |
| 97 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); |
| 98 } |
| 99 |
| 100 void NodeController::ConnectToChild(base::ProcessHandle process_handle, |
| 101 ScopedPlatformHandle platform_handle) { |
| 102 io_task_runner_->PostTask( |
| 103 FROM_HERE, |
| 104 base::Bind(&NodeController::ConnectToChildOnIOThread, |
| 105 base::Unretained(this), |
| 106 process_handle, |
| 107 base::Passed(&platform_handle))); |
| 108 } |
| 109 |
| 110 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { |
| 111 io_task_runner_->PostTask( |
| 112 FROM_HERE, |
| 113 base::Bind(&NodeController::ConnectToParentOnIOThread, |
| 114 base::Unretained(this), |
| 115 base::Passed(&platform_handle))); |
| 116 } |
| 117 |
| 118 void NodeController::SetPortObserver( |
| 119 const ports::PortRef& port, |
| 120 const scoped_refptr<PortObserver>& observer) { |
| 121 node_->SetUserData(port, observer); |
| 122 } |
| 123 |
| 124 void NodeController::ClosePort(const ports::PortRef& port) { |
| 125 SetPortObserver(port, nullptr); |
| 126 int rv = node_->ClosePort(port); |
| 127 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); |
| 128 |
| 129 AcceptIncomingMessages(); |
| 130 } |
| 131 |
| 132 int NodeController::SendMessage(const ports::PortRef& port, |
| 133 scoped_ptr<PortsMessage>* message) { |
| 134 ports::ScopedMessage ports_message(message->release()); |
| 135 int rv = node_->SendMessage(port, &ports_message); |
| 136 if (rv != ports::OK) { |
| 137 DCHECK(ports_message); |
| 138 message->reset(static_cast<PortsMessage*>(ports_message.release())); |
| 139 } |
| 140 |
| 141 AcceptIncomingMessages(); |
| 142 return rv; |
| 143 } |
| 144 |
| 145 void NodeController::ReservePort(const std::string& token, |
| 146 const ReservePortCallback& callback) { |
| 147 ports::PortRef port; |
| 148 node_->CreateUninitializedPort(&port); |
| 149 |
| 150 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " |
| 151 << token; |
| 152 |
| 153 base::AutoLock lock(reserved_ports_lock_); |
| 154 ReservedPort reservation; |
| 155 reservation.local_port = port; |
| 156 reservation.callback = callback; |
| 157 reserved_ports_.insert(std::make_pair(token, reservation)); |
| 158 } |
| 159 |
| 160 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( |
| 161 size_t num_bytes) { |
| 162 // TODO: Broker through the parent over a sync channel. :( |
| 163 return internal::g_platform_support->CreateSharedBuffer(num_bytes); |
| 164 } |
| 165 |
| 166 void NodeController::ConnectToParentPort(const ports::PortRef& local_port, |
| 167 const std::string& token, |
| 168 const base::Closure& callback) { |
| 169 io_task_runner_->PostTask( |
| 170 FROM_HERE, |
| 171 base::Bind(&NodeController::RequestParentPortConnectionOnIOThread, |
| 172 base::Unretained(this), local_port, token, callback)); |
| 173 } |
| 174 |
| 175 void NodeController::ConnectReservedPorts(const std::string& token1, |
| 176 const std::string& token2) { |
| 177 ReservedPort port1; |
| 178 ReservedPort port2; |
| 179 { |
| 180 base::AutoLock lock(reserved_ports_lock_); |
| 181 auto it1 = reserved_ports_.find(token1); |
| 182 if (it1 == reserved_ports_.end()) |
| 183 return; |
| 184 auto it2 = reserved_ports_.find(token2); |
| 185 if (it2 == reserved_ports_.end()) |
| 186 return; |
| 187 port1 = it1->second; |
| 188 port2 = it2->second; |
| 189 reserved_ports_.erase(it1); |
| 190 reserved_ports_.erase(it2); |
| 191 } |
| 192 |
| 193 node_->InitializePort(port1.local_port, name_, port2.local_port.name()); |
| 194 node_->InitializePort(port2.local_port, name_, port1.local_port.name()); |
| 195 port1.callback.Run(port1.local_port); |
| 196 port2.callback.Run(port2.local_port); |
| 197 } |
| 198 |
| 199 void NodeController::RequestShutdown(const base::Closure& callback) { |
| 200 { |
| 201 base::AutoLock lock(shutdown_lock_); |
| 202 shutdown_callback_ = callback; |
| 203 } |
| 204 |
| 205 AttemptShutdownIfRequested(); |
| 206 } |
| 207 |
| 208 void NodeController::ConnectToChildOnIOThread( |
| 209 base::ProcessHandle process_handle, |
| 210 ScopedPlatformHandle platform_handle) { |
| 211 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 212 |
| 213 scoped_refptr<NodeChannel> channel = |
| 214 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); |
| 215 |
| 216 ports::NodeName token; |
| 217 GenerateRandomName(&token); |
| 218 |
| 219 channel->SetRemoteNodeName(token); |
| 220 channel->SetRemoteProcessHandle(process_handle); |
| 221 channel->Start(); |
| 222 channel->AcceptChild(name_, token); |
| 223 |
| 224 pending_children_.insert(std::make_pair(token, channel)); |
| 225 } |
| 226 |
| 227 void NodeController::ConnectToParentOnIOThread( |
| 228 ScopedPlatformHandle platform_handle) { |
| 229 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 230 |
| 231 base::AutoLock lock(parent_lock_); |
| 232 DCHECK(parent_name_ == ports::kInvalidNodeName); |
| 233 |
| 234 // At this point we don't know the parent's name, so we can't yet insert it |
| 235 // into our |peers_| map. That will happen as soon as we receive an |
| 236 // AcceptChild message from them. |
| 237 bootstrap_parent_channel_ = |
| 238 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); |
| 239 bootstrap_parent_channel_->Start(); |
| 240 } |
| 241 |
| 242 void NodeController::RequestParentPortConnectionOnIOThread( |
| 243 const ports::PortRef& local_port, |
| 244 const std::string& token, |
| 245 const base::Closure& callback) { |
| 246 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 247 |
| 248 scoped_refptr<NodeChannel> parent = GetParentChannel(); |
| 249 if (!parent) { |
| 250 PendingPortRequest request; |
| 251 request.token = token; |
| 252 request.local_port = local_port; |
| 253 request.callback = callback; |
| 254 pending_port_requests_.push_back(request); |
| 255 return; |
| 256 } |
| 257 |
| 258 pending_port_connections_.insert(std::make_pair(local_port.name(), callback)); |
| 259 parent->RequestPortConnection(local_port.name(), token); |
| 260 } |
| 261 |
| 262 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
| 263 const ports::NodeName& name) { |
| 264 base::AutoLock lock(peers_lock_); |
| 265 auto it = peers_.find(name); |
| 266 if (it == peers_.end()) |
| 267 return nullptr; |
| 268 return it->second; |
| 269 } |
| 270 |
| 271 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { |
| 272 ports::NodeName parent_name; |
| 273 { |
| 274 base::AutoLock lock(parent_lock_); |
| 275 parent_name = parent_name_; |
| 276 } |
| 277 return GetPeerChannel(parent_name); |
| 278 } |
| 279 |
| 280 void NodeController::AddPeer(const ports::NodeName& name, |
| 281 scoped_refptr<NodeChannel> channel, |
| 282 bool start_channel) { |
| 283 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 284 |
| 285 DCHECK(name != ports::kInvalidNodeName); |
| 286 DCHECK(channel); |
| 287 |
| 288 channel->SetRemoteNodeName(name); |
| 289 |
| 290 base::AutoLock lock(peers_lock_); |
| 291 if (peers_.find(name) != peers_.end()) { |
| 292 // This can happen normally if two nodes race to be introduced to each |
| 293 // other. The losing pipe will be silently closed and introduction should |
| 294 // not be affected. |
| 295 DVLOG(1) << "Ignoring duplicate peer name " << name; |
| 296 return; |
| 297 } |
| 298 |
| 299 auto result = peers_.insert(std::make_pair(name, channel)); |
| 300 DCHECK(result.second); |
| 301 |
| 302 DVLOG(2) << "Accepting new peer " << name << " on node " << name_; |
| 303 |
| 304 if (start_channel) |
| 305 channel->Start(); |
| 306 |
| 307 // Flush any queued message we need to deliver to this node. |
| 308 OutgoingMessageQueue pending_messages; |
| 309 auto it = pending_peer_messages_.find(name); |
| 310 if (it != pending_peer_messages_.end()) { |
| 311 auto& message_queue = it->second; |
| 312 while (!message_queue.empty()) { |
| 313 ports::ScopedMessage message = std::move(message_queue.front()); |
| 314 channel->PortsMessage( |
| 315 static_cast<PortsMessage*>(message.get())->TakeChannelMessage()); |
| 316 message_queue.pop(); |
| 317 } |
| 318 pending_peer_messages_.erase(it); |
| 319 } |
| 320 } |
| 321 |
| 322 void NodeController::DropPeer(const ports::NodeName& name) { |
| 323 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 324 |
| 325 { |
| 326 base::AutoLock lock(peers_lock_); |
| 327 auto it = peers_.find(name); |
| 328 |
| 329 if (it != peers_.end()) { |
| 330 ports::NodeName peer = it->first; |
| 331 peers_.erase(it); |
| 332 DVLOG(1) << "Dropped peer " << peer; |
| 333 } |
| 334 |
| 335 pending_peer_messages_.erase(name); |
| 336 pending_children_.erase(name); |
| 337 } |
| 338 |
| 339 node_->LostConnectionToNode(name); |
| 340 } |
| 341 |
| 342 void NodeController::SendPeerMessage(const ports::NodeName& name, |
| 343 ports::ScopedMessage message) { |
| 344 PortsMessage* ports_message = static_cast<PortsMessage*>(message.get()); |
| 345 |
| 346 #if defined(OS_WIN) |
| 347 // If we're sending a message with handles and we're not the parent, |
| 348 // relay the message through the parent. |
| 349 if (ports_message->has_handles()) { |
| 350 scoped_refptr<NodeChannel> parent = GetParentChannel(); |
| 351 if (parent) { |
| 352 parent->RelayPortsMessage(name, ports_message->TakeChannelMessage()); |
| 353 return; |
| 354 } |
| 355 } |
| 356 #endif |
| 357 |
| 358 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); |
| 359 if (peer) { |
| 360 peer->PortsMessage(ports_message->TakeChannelMessage()); |
| 361 return; |
| 362 } |
| 363 |
| 364 // If we don't know who the peer is, queue the message for delivery. If this |
| 365 // is the first message queued for the peer, we also ask the parent to |
| 366 // introduce us to them. |
| 367 |
| 368 bool needs_introduction = false; |
| 369 { |
| 370 base::AutoLock lock(peers_lock_); |
| 371 auto& queue = pending_peer_messages_[name]; |
| 372 needs_introduction = queue.empty(); |
| 373 queue.emplace(std::move(message)); |
| 374 } |
| 375 |
| 376 if (needs_introduction) { |
| 377 scoped_refptr<NodeChannel> parent = GetParentChannel(); |
| 378 if (!parent) { |
| 379 DVLOG(1) << "Dropping message for unknown peer: " << name; |
| 380 return; |
| 381 } |
| 382 parent->RequestIntroduction(name); |
| 383 } |
| 384 } |
| 385 |
| 386 void NodeController::AcceptIncomingMessages() { |
| 387 std::queue<ports::ScopedMessage> messages; |
| 388 for (;;) { |
| 389 // TODO: We may need to be more careful to avoid starving the rest of the |
| 390 // thread here. Revisit this if it turns out to be a problem. One |
| 391 // alternative would be to schedule a task to continue pumping messages |
| 392 // after flushing once. |
| 393 |
| 394 { |
| 395 base::AutoLock lock(messages_lock_); |
| 396 if (incoming_messages_.empty()) |
| 397 break; |
| 398 std::swap(messages, incoming_messages_); |
| 399 } |
| 400 |
| 401 while (!messages.empty()) { |
| 402 node_->AcceptMessage(std::move(messages.front())); |
| 403 messages.pop(); |
| 404 } |
| 405 } |
| 406 AttemptShutdownIfRequested(); |
| 407 } |
| 408 |
| 409 void NodeController::DropAllPeers() { |
| 410 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 411 |
| 412 { |
| 413 base::AutoLock lock(parent_lock_); |
| 414 if (bootstrap_parent_channel_) { |
| 415 bootstrap_parent_channel_->ShutDown(); |
| 416 bootstrap_parent_channel_ = nullptr; |
| 417 } |
| 418 } |
| 419 |
| 420 std::vector<scoped_refptr<NodeChannel>> all_peers; |
| 421 { |
| 422 base::AutoLock lock(peers_lock_); |
| 423 for (const auto& peer : peers_) |
| 424 all_peers.push_back(peer.second); |
| 425 for (const auto& peer : pending_children_) |
| 426 all_peers.push_back(peer.second); |
| 427 peers_.clear(); |
| 428 pending_children_.clear(); |
| 429 pending_peer_messages_.clear(); |
| 430 } |
| 431 |
| 432 for (const auto& peer : all_peers) |
| 433 peer->ShutDown(); |
| 434 |
| 435 if (destroy_on_io_thread_shutdown_) |
| 436 delete this; |
| 437 } |
| 438 |
| 439 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { |
| 440 GenerateRandomName(port_name); |
| 441 } |
| 442 |
| 443 void NodeController::AllocMessage(size_t num_header_bytes, |
| 444 ports::ScopedMessage* message) { |
| 445 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr)); |
| 446 } |
| 447 |
| 448 void NodeController::ForwardMessage(const ports::NodeName& node, |
| 449 ports::ScopedMessage message) { |
| 450 if (node == name_) { |
| 451 // NOTE: We need to avoid re-entering the Node instance within |
| 452 // ForwardMessage. Because ForwardMessage is only ever called |
| 453 // (synchronously) in response to Node's ClosePort, SendMessage, or |
| 454 // AcceptMessage, we flush the queue after calling any of those methods. |
| 455 base::AutoLock lock(messages_lock_); |
| 456 incoming_messages_.emplace(std::move(message)); |
| 457 } else { |
| 458 SendPeerMessage(node, std::move(message)); |
| 459 } |
| 460 } |
| 461 |
| 462 void NodeController::PortStatusChanged(const ports::PortRef& port) { |
| 463 scoped_refptr<ports::UserData> user_data; |
| 464 node_->GetUserData(port, &user_data); |
| 465 |
| 466 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); |
| 467 if (observer) { |
| 468 observer->OnPortStatusChanged(); |
| 469 } else { |
| 470 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
| 471 << "doesn't have an observer."; |
| 472 } |
| 473 } |
| 474 |
| 475 void NodeController::OnAcceptChild(const ports::NodeName& from_node, |
| 476 const ports::NodeName& parent_name, |
| 477 const ports::NodeName& token) { |
| 478 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 479 |
| 480 scoped_refptr<NodeChannel> parent; |
| 481 { |
| 482 base::AutoLock lock(parent_lock_); |
| 483 if (!bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) { |
| 484 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; |
| 485 DropPeer(from_node); |
| 486 return; |
| 487 } |
| 488 |
| 489 parent_name_ = parent_name; |
| 490 parent = bootstrap_parent_channel_; |
| 491 bootstrap_parent_channel_ = nullptr; |
| 492 } |
| 493 |
| 494 parent->AcceptParent(token, name_); |
| 495 for (const auto& request : pending_port_requests_) { |
| 496 pending_port_connections_.insert( |
| 497 std::make_pair(request.local_port.name(), request.callback)); |
| 498 parent->RequestPortConnection(request.local_port.name(), request.token); |
| 499 } |
| 500 pending_port_requests_.clear(); |
| 501 |
| 502 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; |
| 503 |
| 504 AddPeer(parent_name_, parent, false /* start_channel */); |
| 505 } |
| 506 |
| 507 void NodeController::OnAcceptParent(const ports::NodeName& from_node, |
| 508 const ports::NodeName& token, |
| 509 const ports::NodeName& child_name) { |
| 510 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 511 |
| 512 auto it = pending_children_.find(from_node); |
| 513 if (it == pending_children_.end() || token != from_node) { |
| 514 DLOG(ERROR) << "Received unexpected AcceptParent message from " |
| 515 << from_node; |
| 516 DropPeer(from_node); |
| 517 return; |
| 518 } |
| 519 |
| 520 scoped_refptr<NodeChannel> channel = it->second; |
| 521 pending_children_.erase(it); |
| 522 |
| 523 DCHECK(channel); |
| 524 |
| 525 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name; |
| 526 |
| 527 AddPeer(child_name, channel, false /* start_channel */); |
| 528 } |
| 529 |
| 530 void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) { |
| 531 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 532 |
| 533 void* data; |
| 534 size_t num_data_bytes; |
| 535 NodeChannel::GetPortsMessageData( |
| 536 channel_message.get(), &data, &num_data_bytes); |
| 537 |
| 538 size_t num_header_bytes, num_payload_bytes, num_ports_bytes; |
| 539 ports::Message::Parse(data, |
| 540 num_data_bytes, |
| 541 &num_header_bytes, |
| 542 &num_payload_bytes, |
| 543 &num_ports_bytes); |
| 544 |
| 545 CHECK(channel_message); |
| 546 ports::ScopedMessage message( |
| 547 new PortsMessage(num_header_bytes, |
| 548 num_payload_bytes, |
| 549 num_ports_bytes, |
| 550 std::move(channel_message))); |
| 551 |
| 552 node_->AcceptMessage(std::move(message)); |
| 553 AcceptIncomingMessages(); |
| 554 AttemptShutdownIfRequested(); |
| 555 } |
| 556 |
| 557 void NodeController::OnRequestPortConnection( |
| 558 const ports::NodeName& from_node, |
| 559 const ports::PortName& connector_port_name, |
| 560 const std::string& token) { |
| 561 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 562 |
| 563 DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token " |
| 564 << token << " and port " << connector_port_name << "@" << from_node; |
| 565 |
| 566 ReservePortCallback callback; |
| 567 ports::PortRef local_port; |
| 568 { |
| 569 base::AutoLock lock(reserved_ports_lock_); |
| 570 auto it = reserved_ports_.find(token); |
| 571 if (it == reserved_ports_.end()) { |
| 572 DVLOG(1) << "Ignoring request to connect to port for unknown token " |
| 573 << token; |
| 574 return; |
| 575 } |
| 576 local_port = it->second.local_port; |
| 577 callback = it->second.callback; |
| 578 reserved_ports_.erase(it); |
| 579 } |
| 580 |
| 581 DCHECK(!callback.is_null()); |
| 582 |
| 583 scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node); |
| 584 if (!peer) { |
| 585 DVLOG(1) << "Ignoring request to connect to port from unknown node " |
| 586 << from_node; |
| 587 return; |
| 588 } |
| 589 |
| 590 // This reserved port should not have been initialized yet. |
| 591 CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node, |
| 592 connector_port_name)); |
| 593 |
| 594 peer->ConnectToPort(local_port.name(), connector_port_name); |
| 595 callback.Run(local_port); |
| 596 } |
| 597 |
| 598 void NodeController::OnConnectToPort( |
| 599 const ports::NodeName& from_node, |
| 600 const ports::PortName& connector_port_name, |
| 601 const ports::PortName& connectee_port_name) { |
| 602 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 603 |
| 604 DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port " |
| 605 << connectee_port_name << " to port " << connector_port_name << "@" |
| 606 << from_node; |
| 607 |
| 608 ports::PortRef connectee_port; |
| 609 int rv = node_->GetPort(connectee_port_name, &connectee_port); |
| 610 if (rv != ports::OK) { |
| 611 DLOG(ERROR) << "Ignoring ConnectToPort for unknown port " |
| 612 << connectee_port_name; |
| 613 return; |
| 614 } |
| 615 |
| 616 // It's OK if this port has already been initialized. This message is only |
| 617 // sent by the remote peer to ensure the port is ready before it starts |
| 618 // us sending messages to it. |
| 619 ports::PortStatus port_status; |
| 620 rv = node_->GetStatus(connectee_port, &port_status); |
| 621 if (rv == ports::OK) { |
| 622 DVLOG(1) << "Ignoring ConnectToPort for already-initialized port " |
| 623 << connectee_port_name; |
| 624 return; |
| 625 } |
| 626 |
| 627 CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node, |
| 628 connector_port_name)); |
| 629 |
| 630 auto it = pending_port_connections_.find(connectee_port_name); |
| 631 DCHECK(it != pending_port_connections_.end()); |
| 632 it->second.Run(); |
| 633 pending_port_connections_.erase(it); |
| 634 } |
| 635 |
| 636 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
| 637 const ports::NodeName& name) { |
| 638 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 639 |
| 640 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); |
| 641 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { |
| 642 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " |
| 643 << from_node; |
| 644 DropPeer(from_node); |
| 645 return; |
| 646 } |
| 647 |
| 648 if (GetParentChannel() != nullptr) { |
| 649 DLOG(ERROR) << "Non-parent node cannot introduce peers to each other."; |
| 650 return; |
| 651 } |
| 652 |
| 653 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); |
| 654 if (!new_friend) { |
| 655 // We don't know who they're talking about! |
| 656 requestor->Introduce(name, ScopedPlatformHandle()); |
| 657 } else { |
| 658 PlatformChannelPair new_channel; |
| 659 requestor->Introduce(name, new_channel.PassServerHandle()); |
| 660 new_friend->Introduce(from_node, new_channel.PassClientHandle()); |
| 661 } |
| 662 } |
| 663 |
| 664 void NodeController::OnIntroduce(const ports::NodeName& from_node, |
| 665 const ports::NodeName& name, |
| 666 ScopedPlatformHandle channel_handle) { |
| 667 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 668 |
| 669 { |
| 670 base::AutoLock lock(parent_lock_); |
| 671 if (from_node != parent_name_) { |
| 672 DLOG(ERROR) << "Received unexpected Introduce message from node " |
| 673 << from_node; |
| 674 DropPeer(from_node); |
| 675 return; |
| 676 } |
| 677 } |
| 678 |
| 679 if (!channel_handle.is_valid()) { |
| 680 DLOG(ERROR) << "Could not be introduced to peer " << name; |
| 681 base::AutoLock lock(peers_lock_); |
| 682 pending_peer_messages_.erase(name); |
| 683 return; |
| 684 } |
| 685 |
| 686 scoped_refptr<NodeChannel> channel = |
| 687 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); |
| 688 |
| 689 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; |
| 690 AddPeer(name, channel, true /* start_channel */); |
| 691 } |
| 692 |
| 693 #if defined(OS_WIN) |
| 694 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
| 695 base::ProcessHandle from_process, |
| 696 const ports::NodeName& destination, |
| 697 Channel::MessagePtr message) { |
| 698 scoped_refptr<NodeChannel> parent = GetParentChannel(); |
| 699 if (parent) { |
| 700 // Only the parent should be asked to relay a message. |
| 701 DLOG(ERROR) << "Non-parent refusing to relay message."; |
| 702 DropPeer(from_node); |
| 703 return; |
| 704 } |
| 705 |
| 706 // The parent should always know which process this came from. |
| 707 DCHECK(from_process != base::kNullProcessHandle); |
| 708 |
| 709 // Duplicate the handles to this (the parent) process. If the message is |
| 710 // destined for another child process, the handles will be duplicated to |
| 711 // that process before going out (see NodeChannel::WriteChannelMessage). |
| 712 // |
| 713 // TODO: We could avoid double-duplication. |
| 714 for (size_t i = 0; i < message->num_handles(); ++i) { |
| 715 BOOL result = DuplicateHandle( |
| 716 from_process, message->handles()[i].handle, |
| 717 base::GetCurrentProcessHandle(), |
| 718 reinterpret_cast<HANDLE*>(message->handles() + i), |
| 719 0, FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); |
| 720 DCHECK(result); |
| 721 } |
| 722 if (destination == name_) { |
| 723 // Great, we can deliver this message locally. |
| 724 OnPortsMessage(std::move(message)); |
| 725 return; |
| 726 } |
| 727 |
| 728 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); |
| 729 if (peer) |
| 730 peer->PortsMessage(std::move(message)); |
| 731 else |
| 732 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; |
| 733 } |
| 734 #endif |
| 735 |
| 736 void NodeController::OnChannelError(const ports::NodeName& from_node) { |
| 737 if (io_task_runner_->RunsTasksOnCurrentThread()) { |
| 738 DropPeer(from_node); |
| 739 } else { |
| 740 io_task_runner_->PostTask( |
| 741 FROM_HERE, |
| 742 base::Bind(&NodeController::DropPeer, base::Unretained(this), |
| 743 from_node)); |
| 744 } |
| 745 } |
| 746 |
| 747 void NodeController::DestroyOnIOThreadShutdown() { |
| 748 destroy_on_io_thread_shutdown_ = true; |
| 749 } |
| 750 |
| 751 void NodeController::AttemptShutdownIfRequested() { |
| 752 base::Closure callback; |
| 753 { |
| 754 base::AutoLock lock(shutdown_lock_); |
| 755 if (shutdown_callback_.is_null()) |
| 756 return; |
| 757 if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) { |
| 758 DVLOG(2) << "Unable to cleanly shut down node " << name_ << "."; |
| 759 return; |
| 760 } |
| 761 callback = shutdown_callback_; |
| 762 shutdown_callback_.Reset(); |
| 763 } |
| 764 |
| 765 DCHECK(!callback.is_null()); |
| 766 |
| 767 callback.Run(); |
| 768 } |
| 769 |
| 770 } // namespace edk |
| 771 } // namespace mojo |
OLD | NEW |