Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(133)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/node_controller.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/message_loop/message_loop.h"
14 #include "crypto/random.h"
15 #include "mojo/edk/embedder/embedder_internal.h"
16 #include "mojo/edk/embedder/platform_channel_pair.h"
17 #include "mojo/edk/embedder/platform_support.h"
18 #include "mojo/edk/system/core.h"
19 #include "mojo/edk/system/ports_message.h"
20
21 namespace mojo {
22 namespace edk {
23
24 namespace {
25
26 template <typename T>
27 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
28
29 ports::NodeName GetRandomNodeName() {
30 ports::NodeName name;
31 GenerateRandomName(&name);
32 return name;
33 }
34
35 // Used by NodeController to watch for shutdown. Since no IO can happen once
36 // the IO thread is killed, the NodeController can cleanly drop all its peers
37 // at that time.
38 class ThreadDestructionObserver :
39 public base::MessageLoop::DestructionObserver {
40 public:
41 static void Create(scoped_refptr<base::TaskRunner> task_runner,
42 const base::Closure& callback) {
43 if (task_runner->RunsTasksOnCurrentThread()) {
44 // Owns itself.
45 new ThreadDestructionObserver(callback);
46 } else {
47 task_runner->PostTask(FROM_HERE,
48 base::Bind(&Create, task_runner, callback));
49 }
50 }
51
52 private:
53 explicit ThreadDestructionObserver(const base::Closure& callback)
54 : callback_(callback) {
55 base::MessageLoop::current()->AddDestructionObserver(this);
56 }
57
58 ~ThreadDestructionObserver() override {
59 base::MessageLoop::current()->RemoveDestructionObserver(this);
60 }
61
62 // base::MessageLoop::DestructionObserver:
63 void WillDestroyCurrentMessageLoop() override {
64 callback_.Run();
65 delete this;
66 }
67
68 const base::Closure callback_;
69
70 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
71 };
72
73 } // namespace
74
75 NodeController::PendingPortRequest::PendingPortRequest() {}
76
77 NodeController::PendingPortRequest::~PendingPortRequest() {}
78
79 NodeController::ReservedPort::ReservedPort() {}
80
81 NodeController::ReservedPort::~ReservedPort() {}
82
83 NodeController::~NodeController() {}
84
85 NodeController::NodeController(Core* core)
86 : core_(core),
87 name_(GetRandomNodeName()),
88 node_(new ports::Node(name_, this)) {
89 DVLOG(1) << "Initializing node " << name_;
90 }
91
92 void NodeController::SetIOTaskRunner(
93 scoped_refptr<base::TaskRunner> task_runner) {
94 io_task_runner_ = task_runner;
95 ThreadDestructionObserver::Create(
96 io_task_runner_,
97 base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
98 }
99
100 void NodeController::ConnectToChild(base::ProcessHandle process_handle,
101 ScopedPlatformHandle platform_handle) {
102 io_task_runner_->PostTask(
103 FROM_HERE,
104 base::Bind(&NodeController::ConnectToChildOnIOThread,
105 base::Unretained(this),
106 process_handle,
107 base::Passed(&platform_handle)));
108 }
109
110 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
111 io_task_runner_->PostTask(
112 FROM_HERE,
113 base::Bind(&NodeController::ConnectToParentOnIOThread,
114 base::Unretained(this),
115 base::Passed(&platform_handle)));
116 }
117
118 void NodeController::SetPortObserver(
119 const ports::PortRef& port,
120 const scoped_refptr<PortObserver>& observer) {
121 node_->SetUserData(port, observer);
122 }
123
124 void NodeController::ClosePort(const ports::PortRef& port) {
125 SetPortObserver(port, nullptr);
126 int rv = node_->ClosePort(port);
127 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
128
129 AcceptIncomingMessages();
130 }
131
132 int NodeController::SendMessage(const ports::PortRef& port,
133 scoped_ptr<PortsMessage>* message) {
134 ports::ScopedMessage ports_message(message->release());
135 int rv = node_->SendMessage(port, &ports_message);
136 if (rv != ports::OK) {
137 DCHECK(ports_message);
138 message->reset(static_cast<PortsMessage*>(ports_message.release()));
139 }
140
141 AcceptIncomingMessages();
142 return rv;
143 }
144
145 void NodeController::ReservePort(const std::string& token,
146 const ReservePortCallback& callback) {
147 ports::PortRef port;
148 node_->CreateUninitializedPort(&port);
149
150 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
151 << token;
152
153 base::AutoLock lock(reserved_ports_lock_);
154 ReservedPort reservation;
155 reservation.local_port = port;
156 reservation.callback = callback;
157 reserved_ports_.insert(std::make_pair(token, reservation));
158 }
159
160 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
161 size_t num_bytes) {
162 // TODO: Broker through the parent over a sync channel. :(
163 return internal::g_platform_support->CreateSharedBuffer(num_bytes);
164 }
165
166 void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
167 const std::string& token,
168 const base::Closure& callback) {
169 io_task_runner_->PostTask(
170 FROM_HERE,
171 base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
172 base::Unretained(this), local_port, token, callback));
173 }
174
175 void NodeController::ConnectReservedPorts(const std::string& token1,
176 const std::string& token2) {
177 ReservedPort port1;
178 ReservedPort port2;
179 {
180 base::AutoLock lock(reserved_ports_lock_);
181 auto it1 = reserved_ports_.find(token1);
182 if (it1 == reserved_ports_.end())
183 return;
184 auto it2 = reserved_ports_.find(token2);
185 if (it2 == reserved_ports_.end())
186 return;
187 port1 = it1->second;
188 port2 = it2->second;
189 reserved_ports_.erase(it1);
190 reserved_ports_.erase(it2);
191 }
192
193 node_->InitializePort(port1.local_port, name_, port2.local_port.name());
194 node_->InitializePort(port2.local_port, name_, port1.local_port.name());
195 port1.callback.Run(port1.local_port);
196 port2.callback.Run(port2.local_port);
197 }
198
199 void NodeController::RequestShutdown(const base::Closure& callback) {
200 {
201 base::AutoLock lock(shutdown_lock_);
202 shutdown_callback_ = callback;
203 }
204
205 AttemptShutdownIfRequested();
206 }
207
208 void NodeController::ConnectToChildOnIOThread(
209 base::ProcessHandle process_handle,
210 ScopedPlatformHandle platform_handle) {
211 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
212
213 scoped_refptr<NodeChannel> channel =
214 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
215
216 ports::NodeName token;
217 GenerateRandomName(&token);
218
219 channel->SetRemoteNodeName(token);
220 channel->SetRemoteProcessHandle(process_handle);
221 channel->Start();
222 channel->AcceptChild(name_, token);
223
224 pending_children_.insert(std::make_pair(token, channel));
225 }
226
227 void NodeController::ConnectToParentOnIOThread(
228 ScopedPlatformHandle platform_handle) {
229 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
230
231 base::AutoLock lock(parent_lock_);
232 DCHECK(parent_name_ == ports::kInvalidNodeName);
233
234 // At this point we don't know the parent's name, so we can't yet insert it
235 // into our |peers_| map. That will happen as soon as we receive an
236 // AcceptChild message from them.
237 bootstrap_parent_channel_ =
238 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
239 bootstrap_parent_channel_->Start();
240 }
241
242 void NodeController::RequestParentPortConnectionOnIOThread(
243 const ports::PortRef& local_port,
244 const std::string& token,
245 const base::Closure& callback) {
246 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
247
248 scoped_refptr<NodeChannel> parent = GetParentChannel();
249 if (!parent) {
250 PendingPortRequest request;
251 request.token = token;
252 request.local_port = local_port;
253 request.callback = callback;
254 pending_port_requests_.push_back(request);
255 return;
256 }
257
258 pending_port_connections_.insert(std::make_pair(local_port.name(), callback));
259 parent->RequestPortConnection(local_port.name(), token);
260 }
261
262 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
263 const ports::NodeName& name) {
264 base::AutoLock lock(peers_lock_);
265 auto it = peers_.find(name);
266 if (it == peers_.end())
267 return nullptr;
268 return it->second;
269 }
270
271 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
272 ports::NodeName parent_name;
273 {
274 base::AutoLock lock(parent_lock_);
275 parent_name = parent_name_;
276 }
277 return GetPeerChannel(parent_name);
278 }
279
280 void NodeController::AddPeer(const ports::NodeName& name,
281 scoped_refptr<NodeChannel> channel,
282 bool start_channel) {
283 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
284
285 DCHECK(name != ports::kInvalidNodeName);
286 DCHECK(channel);
287
288 channel->SetRemoteNodeName(name);
289
290 base::AutoLock lock(peers_lock_);
291 if (peers_.find(name) != peers_.end()) {
292 // This can happen normally if two nodes race to be introduced to each
293 // other. The losing pipe will be silently closed and introduction should
294 // not be affected.
295 DVLOG(1) << "Ignoring duplicate peer name " << name;
296 return;
297 }
298
299 auto result = peers_.insert(std::make_pair(name, channel));
300 DCHECK(result.second);
301
302 DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
303
304 if (start_channel)
305 channel->Start();
306
307 // Flush any queued message we need to deliver to this node.
308 OutgoingMessageQueue pending_messages;
309 auto it = pending_peer_messages_.find(name);
310 if (it != pending_peer_messages_.end()) {
311 auto& message_queue = it->second;
312 while (!message_queue.empty()) {
313 ports::ScopedMessage message = std::move(message_queue.front());
314 channel->PortsMessage(
315 static_cast<PortsMessage*>(message.get())->TakeChannelMessage());
316 message_queue.pop();
317 }
318 pending_peer_messages_.erase(it);
319 }
320 }
321
322 void NodeController::DropPeer(const ports::NodeName& name) {
323 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
324
325 {
326 base::AutoLock lock(peers_lock_);
327 auto it = peers_.find(name);
328
329 if (it != peers_.end()) {
330 ports::NodeName peer = it->first;
331 peers_.erase(it);
332 DVLOG(1) << "Dropped peer " << peer;
333 }
334
335 pending_peer_messages_.erase(name);
336 pending_children_.erase(name);
337 }
338
339 node_->LostConnectionToNode(name);
340 }
341
342 void NodeController::SendPeerMessage(const ports::NodeName& name,
343 ports::ScopedMessage message) {
344 PortsMessage* ports_message = static_cast<PortsMessage*>(message.get());
345
346 #if defined(OS_WIN)
347 // If we're sending a message with handles and we're not the parent,
348 // relay the message through the parent.
349 if (ports_message->has_handles()) {
350 scoped_refptr<NodeChannel> parent = GetParentChannel();
351 if (parent) {
352 parent->RelayPortsMessage(name, ports_message->TakeChannelMessage());
353 return;
354 }
355 }
356 #endif
357
358 scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
359 if (peer) {
360 peer->PortsMessage(ports_message->TakeChannelMessage());
361 return;
362 }
363
364 // If we don't know who the peer is, queue the message for delivery. If this
365 // is the first message queued for the peer, we also ask the parent to
366 // introduce us to them.
367
368 bool needs_introduction = false;
369 {
370 base::AutoLock lock(peers_lock_);
371 auto& queue = pending_peer_messages_[name];
372 needs_introduction = queue.empty();
373 queue.emplace(std::move(message));
374 }
375
376 if (needs_introduction) {
377 scoped_refptr<NodeChannel> parent = GetParentChannel();
378 if (!parent) {
379 DVLOG(1) << "Dropping message for unknown peer: " << name;
380 return;
381 }
382 parent->RequestIntroduction(name);
383 }
384 }
385
386 void NodeController::AcceptIncomingMessages() {
387 std::queue<ports::ScopedMessage> messages;
388 for (;;) {
389 // TODO: We may need to be more careful to avoid starving the rest of the
390 // thread here. Revisit this if it turns out to be a problem. One
391 // alternative would be to schedule a task to continue pumping messages
392 // after flushing once.
393
394 {
395 base::AutoLock lock(messages_lock_);
396 if (incoming_messages_.empty())
397 break;
398 std::swap(messages, incoming_messages_);
399 }
400
401 while (!messages.empty()) {
402 node_->AcceptMessage(std::move(messages.front()));
403 messages.pop();
404 }
405 }
406 AttemptShutdownIfRequested();
407 }
408
409 void NodeController::DropAllPeers() {
410 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
411
412 {
413 base::AutoLock lock(parent_lock_);
414 if (bootstrap_parent_channel_) {
415 bootstrap_parent_channel_->ShutDown();
416 bootstrap_parent_channel_ = nullptr;
417 }
418 }
419
420 std::vector<scoped_refptr<NodeChannel>> all_peers;
421 {
422 base::AutoLock lock(peers_lock_);
423 for (const auto& peer : peers_)
424 all_peers.push_back(peer.second);
425 for (const auto& peer : pending_children_)
426 all_peers.push_back(peer.second);
427 peers_.clear();
428 pending_children_.clear();
429 pending_peer_messages_.clear();
430 }
431
432 for (const auto& peer : all_peers)
433 peer->ShutDown();
434
435 if (destroy_on_io_thread_shutdown_)
436 delete this;
437 }
438
439 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
440 GenerateRandomName(port_name);
441 }
442
443 void NodeController::AllocMessage(size_t num_header_bytes,
444 ports::ScopedMessage* message) {
445 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
446 }
447
448 void NodeController::ForwardMessage(const ports::NodeName& node,
449 ports::ScopedMessage message) {
450 if (node == name_) {
451 // NOTE: We need to avoid re-entering the Node instance within
452 // ForwardMessage. Because ForwardMessage is only ever called
453 // (synchronously) in response to Node's ClosePort, SendMessage, or
454 // AcceptMessage, we flush the queue after calling any of those methods.
455 base::AutoLock lock(messages_lock_);
456 incoming_messages_.emplace(std::move(message));
457 } else {
458 SendPeerMessage(node, std::move(message));
459 }
460 }
461
462 void NodeController::PortStatusChanged(const ports::PortRef& port) {
463 scoped_refptr<ports::UserData> user_data;
464 node_->GetUserData(port, &user_data);
465
466 PortObserver* observer = static_cast<PortObserver*>(user_data.get());
467 if (observer) {
468 observer->OnPortStatusChanged();
469 } else {
470 DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
471 << "doesn't have an observer.";
472 }
473 }
474
475 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
476 const ports::NodeName& parent_name,
477 const ports::NodeName& token) {
478 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
479
480 scoped_refptr<NodeChannel> parent;
481 {
482 base::AutoLock lock(parent_lock_);
483 if (!bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) {
484 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
485 DropPeer(from_node);
486 return;
487 }
488
489 parent_name_ = parent_name;
490 parent = bootstrap_parent_channel_;
491 bootstrap_parent_channel_ = nullptr;
492 }
493
494 parent->AcceptParent(token, name_);
495 for (const auto& request : pending_port_requests_) {
496 pending_port_connections_.insert(
497 std::make_pair(request.local_port.name(), request.callback));
498 parent->RequestPortConnection(request.local_port.name(), request.token);
499 }
500 pending_port_requests_.clear();
501
502 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
503
504 AddPeer(parent_name_, parent, false /* start_channel */);
505 }
506
507 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
508 const ports::NodeName& token,
509 const ports::NodeName& child_name) {
510 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
511
512 auto it = pending_children_.find(from_node);
513 if (it == pending_children_.end() || token != from_node) {
514 DLOG(ERROR) << "Received unexpected AcceptParent message from "
515 << from_node;
516 DropPeer(from_node);
517 return;
518 }
519
520 scoped_refptr<NodeChannel> channel = it->second;
521 pending_children_.erase(it);
522
523 DCHECK(channel);
524
525 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
526
527 AddPeer(child_name, channel, false /* start_channel */);
528 }
529
530 void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) {
531 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
532
533 void* data;
534 size_t num_data_bytes;
535 NodeChannel::GetPortsMessageData(
536 channel_message.get(), &data, &num_data_bytes);
537
538 size_t num_header_bytes, num_payload_bytes, num_ports_bytes;
539 ports::Message::Parse(data,
540 num_data_bytes,
541 &num_header_bytes,
542 &num_payload_bytes,
543 &num_ports_bytes);
544
545 CHECK(channel_message);
546 ports::ScopedMessage message(
547 new PortsMessage(num_header_bytes,
548 num_payload_bytes,
549 num_ports_bytes,
550 std::move(channel_message)));
551
552 node_->AcceptMessage(std::move(message));
553 AcceptIncomingMessages();
554 AttemptShutdownIfRequested();
555 }
556
557 void NodeController::OnRequestPortConnection(
558 const ports::NodeName& from_node,
559 const ports::PortName& connector_port_name,
560 const std::string& token) {
561 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
562
563 DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token "
564 << token << " and port " << connector_port_name << "@" << from_node;
565
566 ReservePortCallback callback;
567 ports::PortRef local_port;
568 {
569 base::AutoLock lock(reserved_ports_lock_);
570 auto it = reserved_ports_.find(token);
571 if (it == reserved_ports_.end()) {
572 DVLOG(1) << "Ignoring request to connect to port for unknown token "
573 << token;
574 return;
575 }
576 local_port = it->second.local_port;
577 callback = it->second.callback;
578 reserved_ports_.erase(it);
579 }
580
581 DCHECK(!callback.is_null());
582
583 scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node);
584 if (!peer) {
585 DVLOG(1) << "Ignoring request to connect to port from unknown node "
586 << from_node;
587 return;
588 }
589
590 // This reserved port should not have been initialized yet.
591 CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
592 connector_port_name));
593
594 peer->ConnectToPort(local_port.name(), connector_port_name);
595 callback.Run(local_port);
596 }
597
598 void NodeController::OnConnectToPort(
599 const ports::NodeName& from_node,
600 const ports::PortName& connector_port_name,
601 const ports::PortName& connectee_port_name) {
602 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
603
604 DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
605 << connectee_port_name << " to port " << connector_port_name << "@"
606 << from_node;
607
608 ports::PortRef connectee_port;
609 int rv = node_->GetPort(connectee_port_name, &connectee_port);
610 if (rv != ports::OK) {
611 DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
612 << connectee_port_name;
613 return;
614 }
615
616 // It's OK if this port has already been initialized. This message is only
617 // sent by the remote peer to ensure the port is ready before it starts
618 // us sending messages to it.
619 ports::PortStatus port_status;
620 rv = node_->GetStatus(connectee_port, &port_status);
621 if (rv == ports::OK) {
622 DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
623 << connectee_port_name;
624 return;
625 }
626
627 CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
628 connector_port_name));
629
630 auto it = pending_port_connections_.find(connectee_port_name);
631 DCHECK(it != pending_port_connections_.end());
632 it->second.Run();
633 pending_port_connections_.erase(it);
634 }
635
636 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
637 const ports::NodeName& name) {
638 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
639
640 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
641 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
642 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
643 << from_node;
644 DropPeer(from_node);
645 return;
646 }
647
648 if (GetParentChannel() != nullptr) {
649 DLOG(ERROR) << "Non-parent node cannot introduce peers to each other.";
650 return;
651 }
652
653 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
654 if (!new_friend) {
655 // We don't know who they're talking about!
656 requestor->Introduce(name, ScopedPlatformHandle());
657 } else {
658 PlatformChannelPair new_channel;
659 requestor->Introduce(name, new_channel.PassServerHandle());
660 new_friend->Introduce(from_node, new_channel.PassClientHandle());
661 }
662 }
663
664 void NodeController::OnIntroduce(const ports::NodeName& from_node,
665 const ports::NodeName& name,
666 ScopedPlatformHandle channel_handle) {
667 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
668
669 {
670 base::AutoLock lock(parent_lock_);
671 if (from_node != parent_name_) {
672 DLOG(ERROR) << "Received unexpected Introduce message from node "
673 << from_node;
674 DropPeer(from_node);
675 return;
676 }
677 }
678
679 if (!channel_handle.is_valid()) {
680 DLOG(ERROR) << "Could not be introduced to peer " << name;
681 base::AutoLock lock(peers_lock_);
682 pending_peer_messages_.erase(name);
683 return;
684 }
685
686 scoped_refptr<NodeChannel> channel =
687 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_);
688
689 DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
690 AddPeer(name, channel, true /* start_channel */);
691 }
692
693 #if defined(OS_WIN)
694 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
695 base::ProcessHandle from_process,
696 const ports::NodeName& destination,
697 Channel::MessagePtr message) {
698 scoped_refptr<NodeChannel> parent = GetParentChannel();
699 if (parent) {
700 // Only the parent should be asked to relay a message.
701 DLOG(ERROR) << "Non-parent refusing to relay message.";
702 DropPeer(from_node);
703 return;
704 }
705
706 // The parent should always know which process this came from.
707 DCHECK(from_process != base::kNullProcessHandle);
708
709 // Duplicate the handles to this (the parent) process. If the message is
710 // destined for another child process, the handles will be duplicated to
711 // that process before going out (see NodeChannel::WriteChannelMessage).
712 //
713 // TODO: We could avoid double-duplication.
714 for (size_t i = 0; i < message->num_handles(); ++i) {
715 BOOL result = DuplicateHandle(
716 from_process, message->handles()[i].handle,
717 base::GetCurrentProcessHandle(),
718 reinterpret_cast<HANDLE*>(message->handles() + i),
719 0, FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
720 DCHECK(result);
721 }
722 if (destination == name_) {
723 // Great, we can deliver this message locally.
724 OnPortsMessage(std::move(message));
725 return;
726 }
727
728 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
729 if (peer)
730 peer->PortsMessage(std::move(message));
731 else
732 DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
733 }
734 #endif
735
736 void NodeController::OnChannelError(const ports::NodeName& from_node) {
737 if (io_task_runner_->RunsTasksOnCurrentThread()) {
738 DropPeer(from_node);
739 } else {
740 io_task_runner_->PostTask(
741 FROM_HERE,
742 base::Bind(&NodeController::DropPeer, base::Unretained(this),
743 from_node));
744 }
745 }
746
747 void NodeController::DestroyOnIOThreadShutdown() {
748 destroy_on_io_thread_shutdown_ = true;
749 }
750
751 void NodeController::AttemptShutdownIfRequested() {
752 base::Closure callback;
753 {
754 base::AutoLock lock(shutdown_lock_);
755 if (shutdown_callback_.is_null())
756 return;
757 if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) {
758 DVLOG(2) << "Unable to cleanly shut down node " << name_ << ".";
759 return;
760 }
761 callback = shutdown_callback_;
762 shutdown_callback_.Reset();
763 }
764
765 DCHECK(!callback.is_null());
766
767 callback.Run();
768 }
769
770 } // namespace edk
771 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698