| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 602 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 613 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); | 613 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
| 614 if (!broker) { | 614 if (!broker) { |
| 615 DVLOG(1) << "Dropping message for unknown peer: " << name; | 615 DVLOG(1) << "Dropping message for unknown peer: " << name; |
| 616 return; | 616 return; |
| 617 } | 617 } |
| 618 broker->RequestIntroduction(name); | 618 broker->RequestIntroduction(name); |
| 619 } | 619 } |
| 620 } | 620 } |
| 621 | 621 |
| 622 void NodeController::AcceptIncomingMessages() { | 622 void NodeController::AcceptIncomingMessages() { |
| 623 while (incoming_messages_flag_) { | 623 { |
| 624 // TODO: We may need to be more careful to avoid starving the rest of the | 624 base::AutoLock lock(messages_lock_); |
| 625 // thread here. Revisit this if it turns out to be a problem. One | 625 if (!incoming_messages_.empty()) { |
| 626 // alternative would be to schedule a task to continue pumping messages | 626 // libstdc++'s deque creates an internal buffer on construction, even when |
| 627 // after flushing once. | 627 // the size is 0. So avoid creating it until it is necessary. |
| 628 std::queue<ports::ScopedMessage> messages; |
| 629 std::swap(messages, incoming_messages_); |
| 630 base::AutoUnlock unlock(messages_lock_); |
| 628 | 631 |
| 629 messages_lock_.Acquire(); | 632 while (!messages.empty()) { |
| 630 if (incoming_messages_.empty()) { | 633 node_->AcceptMessage(std::move(messages.front())); |
| 631 messages_lock_.Release(); | 634 messages.pop(); |
| 632 break; | 635 } |
| 633 } | |
| 634 // libstdc++'s deque creates an internal buffer on construction, even when | |
| 635 // the size is 0. So avoid creating it until it is necessary. | |
| 636 std::queue<ports::ScopedMessage> messages; | |
| 637 std::swap(messages, incoming_messages_); | |
| 638 incoming_messages_flag_.Set(false); | |
| 639 messages_lock_.Release(); | |
| 640 | |
| 641 while (!messages.empty()) { | |
| 642 node_->AcceptMessage(std::move(messages.front())); | |
| 643 messages.pop(); | |
| 644 } | 636 } |
| 645 } | 637 } |
| 638 |
| 646 AttemptShutdownIfRequested(); | 639 AttemptShutdownIfRequested(); |
| 647 } | 640 } |
| 648 | 641 |
| 649 void NodeController::DropAllPeers() { | 642 void NodeController::DropAllPeers() { |
| 650 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 643 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 651 | 644 |
| 652 std::vector<scoped_refptr<NodeChannel>> all_peers; | 645 std::vector<scoped_refptr<NodeChannel>> all_peers; |
| 653 { | 646 { |
| 654 base::AutoLock lock(parent_lock_); | 647 base::AutoLock lock(parent_lock_); |
| 655 if (bootstrap_parent_channel_) { | 648 if (bootstrap_parent_channel_) { |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 693 void NodeController::ForwardMessage(const ports::NodeName& node, | 686 void NodeController::ForwardMessage(const ports::NodeName& node, |
| 694 ports::ScopedMessage message) { | 687 ports::ScopedMessage message) { |
| 695 DCHECK(message); | 688 DCHECK(message); |
| 696 if (node == name_) { | 689 if (node == name_) { |
| 697 // NOTE: We need to avoid re-entering the Node instance within | 690 // NOTE: We need to avoid re-entering the Node instance within |
| 698 // ForwardMessage. Because ForwardMessage is only ever called | 691 // ForwardMessage. Because ForwardMessage is only ever called |
| 699 // (synchronously) in response to Node's ClosePort, SendMessage, or | 692 // (synchronously) in response to Node's ClosePort, SendMessage, or |
| 700 // AcceptMessage, we flush the queue after calling any of those methods. | 693 // AcceptMessage, we flush the queue after calling any of those methods. |
| 701 base::AutoLock lock(messages_lock_); | 694 base::AutoLock lock(messages_lock_); |
| 702 incoming_messages_.emplace(std::move(message)); | 695 incoming_messages_.emplace(std::move(message)); |
| 703 incoming_messages_flag_.Set(true); | |
| 704 } else { | 696 } else { |
| 705 SendPeerMessage(node, std::move(message)); | 697 SendPeerMessage(node, std::move(message)); |
| 706 } | 698 } |
| 707 } | 699 } |
| 708 | 700 |
| 709 void NodeController::BroadcastMessage(ports::ScopedMessage message) { | 701 void NodeController::BroadcastMessage(ports::ScopedMessage message) { |
| 710 CHECK_EQ(message->num_ports(), 0u); | 702 CHECK_EQ(message->num_ports(), 0u); |
| 711 Channel::MessagePtr channel_message = | 703 Channel::MessagePtr channel_message = |
| 712 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); | 704 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); |
| 713 CHECK(!channel_message->has_handles()); | 705 CHECK(!channel_message->has_handles()); |
| (...skipping 489 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1203 shutdown_callback_flag_.Set(false); | 1195 shutdown_callback_flag_.Set(false); |
| 1204 } | 1196 } |
| 1205 | 1197 |
| 1206 DCHECK(!callback.is_null()); | 1198 DCHECK(!callback.is_null()); |
| 1207 | 1199 |
| 1208 callback.Run(); | 1200 callback.Run(); |
| 1209 } | 1201 } |
| 1210 | 1202 |
| 1211 } // namespace edk | 1203 } // namespace edk |
| 1212 } // namespace mojo | 1204 } // namespace mojo |
| OLD | NEW |