Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(406)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2209733002: [mojo-edk] Prevent AcceptIncomingMessages flushing aggressively (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@2785
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/watch_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 602 matching lines...) Expand 10 before | Expand all | Expand 10 after
613 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 613 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
614 if (!broker) { 614 if (!broker) {
615 DVLOG(1) << "Dropping message for unknown peer: " << name; 615 DVLOG(1) << "Dropping message for unknown peer: " << name;
616 return; 616 return;
617 } 617 }
618 broker->RequestIntroduction(name); 618 broker->RequestIntroduction(name);
619 } 619 }
620 } 620 }
621 621
622 void NodeController::AcceptIncomingMessages() { 622 void NodeController::AcceptIncomingMessages() {
623 while (incoming_messages_flag_) { 623 {
624 // TODO: We may need to be more careful to avoid starving the rest of the 624 base::AutoLock lock(messages_lock_);
625 // thread here. Revisit this if it turns out to be a problem. One 625 if (!incoming_messages_.empty()) {
626 // alternative would be to schedule a task to continue pumping messages 626 // libstdc++'s deque creates an internal buffer on construction, even when
627 // after flushing once. 627 // the size is 0. So avoid creating it until it is necessary.
628 std::queue<ports::ScopedMessage> messages;
629 std::swap(messages, incoming_messages_);
630 base::AutoUnlock unlock(messages_lock_);
628 631
629 messages_lock_.Acquire(); 632 while (!messages.empty()) {
630 if (incoming_messages_.empty()) { 633 node_->AcceptMessage(std::move(messages.front()));
631 messages_lock_.Release(); 634 messages.pop();
632 break; 635 }
633 }
634 // libstdc++'s deque creates an internal buffer on construction, even when
635 // the size is 0. So avoid creating it until it is necessary.
636 std::queue<ports::ScopedMessage> messages;
637 std::swap(messages, incoming_messages_);
638 incoming_messages_flag_.Set(false);
639 messages_lock_.Release();
640
641 while (!messages.empty()) {
642 node_->AcceptMessage(std::move(messages.front()));
643 messages.pop();
644 } 636 }
645 } 637 }
638
646 AttemptShutdownIfRequested(); 639 AttemptShutdownIfRequested();
647 } 640 }
648 641
649 void NodeController::DropAllPeers() { 642 void NodeController::DropAllPeers() {
650 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 643 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
651 644
652 std::vector<scoped_refptr<NodeChannel>> all_peers; 645 std::vector<scoped_refptr<NodeChannel>> all_peers;
653 { 646 {
654 base::AutoLock lock(parent_lock_); 647 base::AutoLock lock(parent_lock_);
655 if (bootstrap_parent_channel_) { 648 if (bootstrap_parent_channel_) {
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
693 void NodeController::ForwardMessage(const ports::NodeName& node, 686 void NodeController::ForwardMessage(const ports::NodeName& node,
694 ports::ScopedMessage message) { 687 ports::ScopedMessage message) {
695 DCHECK(message); 688 DCHECK(message);
696 if (node == name_) { 689 if (node == name_) {
697 // NOTE: We need to avoid re-entering the Node instance within 690 // NOTE: We need to avoid re-entering the Node instance within
698 // ForwardMessage. Because ForwardMessage is only ever called 691 // ForwardMessage. Because ForwardMessage is only ever called
699 // (synchronously) in response to Node's ClosePort, SendMessage, or 692 // (synchronously) in response to Node's ClosePort, SendMessage, or
700 // AcceptMessage, we flush the queue after calling any of those methods. 693 // AcceptMessage, we flush the queue after calling any of those methods.
701 base::AutoLock lock(messages_lock_); 694 base::AutoLock lock(messages_lock_);
702 incoming_messages_.emplace(std::move(message)); 695 incoming_messages_.emplace(std::move(message));
703 incoming_messages_flag_.Set(true);
704 } else { 696 } else {
705 SendPeerMessage(node, std::move(message)); 697 SendPeerMessage(node, std::move(message));
706 } 698 }
707 } 699 }
708 700
709 void NodeController::BroadcastMessage(ports::ScopedMessage message) { 701 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
710 CHECK_EQ(message->num_ports(), 0u); 702 CHECK_EQ(message->num_ports(), 0u);
711 Channel::MessagePtr channel_message = 703 Channel::MessagePtr channel_message =
712 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 704 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
713 CHECK(!channel_message->has_handles()); 705 CHECK(!channel_message->has_handles());
(...skipping 489 matching lines...) Expand 10 before | Expand all | Expand 10 after
1203 shutdown_callback_flag_.Set(false); 1195 shutdown_callback_flag_.Set(false);
1204 } 1196 }
1205 1197
1206 DCHECK(!callback.is_null()); 1198 DCHECK(!callback.is_null());
1207 1199
1208 callback.Run(); 1200 callback.Run();
1209 } 1201 }
1210 1202
1211 } // namespace edk 1203 } // namespace edk
1212 } // namespace mojo 1204 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/watch_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698