Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(494)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2153733002: [mojo-edk] Prevent AcceptIncomingMessages flushing aggressively (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: -_- Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/watch_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 634 matching lines...) Expand 10 before | Expand all | Expand 10 after
645 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 645 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
646 if (!broker) { 646 if (!broker) {
647 DVLOG(1) << "Dropping message for unknown peer: " << name; 647 DVLOG(1) << "Dropping message for unknown peer: " << name;
648 return; 648 return;
649 } 649 }
650 broker->RequestIntroduction(name); 650 broker->RequestIntroduction(name);
651 } 651 }
652 } 652 }
653 653
654 void NodeController::AcceptIncomingMessages() { 654 void NodeController::AcceptIncomingMessages() {
655 while (incoming_messages_flag_) { 655 {
656 // TODO: We may need to be more careful to avoid starving the rest of the 656 base::AutoLock lock(messages_lock_);
657 // thread here. Revisit this if it turns out to be a problem. One 657 if (!incoming_messages_.empty()) {
658 // alternative would be to schedule a task to continue pumping messages 658 // libstdc++'s deque creates an internal buffer on construction, even when
659 // after flushing once. 659 // the size is 0. So avoid creating it until it is necessary.
660 std::queue<ports::ScopedMessage> messages;
661 std::swap(messages, incoming_messages_);
662 base::AutoUnlock unlock(messages_lock_);
660 663
661 messages_lock_.Acquire(); 664 while (!messages.empty()) {
662 if (incoming_messages_.empty()) { 665 node_->AcceptMessage(std::move(messages.front()));
663 messages_lock_.Release(); 666 messages.pop();
664 break; 667 }
665 }
666 // libstdc++'s deque creates an internal buffer on construction, even when
667 // the size is 0. So avoid creating it until it is necessary.
668 std::queue<ports::ScopedMessage> messages;
669 std::swap(messages, incoming_messages_);
670 incoming_messages_flag_.Set(false);
671 messages_lock_.Release();
672
673 while (!messages.empty()) {
674 node_->AcceptMessage(std::move(messages.front()));
675 messages.pop();
676 } 668 }
677 } 669 }
670
678 AttemptShutdownIfRequested(); 671 AttemptShutdownIfRequested();
679 } 672 }
680 673
681 void NodeController::ProcessIncomingMessages() { 674 void NodeController::ProcessIncomingMessages() {
682 RequestContext request_context(RequestContext::Source::SYSTEM); 675 RequestContext request_context(RequestContext::Source::SYSTEM);
683 676
684 { 677 {
685 base::AutoLock lock(messages_lock_); 678 base::AutoLock lock(messages_lock_);
686 // Allow a new incoming messages processing task to be posted. This can't be 679 // Allow a new incoming messages processing task to be posted. This can't be
687 // done after AcceptIncomingMessages() otherwise a message might be missed. 680 // done after AcceptIncomingMessages() otherwise a message might be missed.
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
746 // ForwardMessage. Because ForwardMessage is only ever called 739 // ForwardMessage. Because ForwardMessage is only ever called
747 // (synchronously) in response to Node's ClosePort, SendMessage, or 740 // (synchronously) in response to Node's ClosePort, SendMessage, or
748 // AcceptMessage, we flush the queue after calling any of those methods. 741 // AcceptMessage, we flush the queue after calling any of those methods.
749 base::AutoLock lock(messages_lock_); 742 base::AutoLock lock(messages_lock_);
750 // |io_task_runner_| may be null in tests or processes that don't require 743 // |io_task_runner_| may be null in tests or processes that don't require
751 // multi-process Mojo. 744 // multi-process Mojo.
752 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ && 745 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
753 !incoming_messages_task_posted_; 746 !incoming_messages_task_posted_;
754 incoming_messages_task_posted_ |= schedule_pump_task; 747 incoming_messages_task_posted_ |= schedule_pump_task;
755 incoming_messages_.emplace(std::move(message)); 748 incoming_messages_.emplace(std::move(message));
756 incoming_messages_flag_.Set(true);
757 } else { 749 } else {
758 SendPeerMessage(node, std::move(message)); 750 SendPeerMessage(node, std::move(message));
759 } 751 }
760 752
761 if (schedule_pump_task) { 753 if (schedule_pump_task) {
762 // Normally, the queue is processed after the action that added the local 754 // Normally, the queue is processed after the action that added the local
763 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also 755 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
764 // possible for a local message to be added as a result of a remote message, 756 // possible for a local message to be added as a result of a remote message,
765 // and OnChannelMessage() doesn't process this queue (although 757 // and OnChannelMessage() doesn't process this queue (although
766 // OnPortsMessage() does). There may also be other code paths, now or added 758 // OnPortsMessage() does). There may also be other code paths, now or added
(...skipping 512 matching lines...) Expand 10 before | Expand all | Expand 10 after
1279 shutdown_callback_flag_.Set(false); 1271 shutdown_callback_flag_.Set(false);
1280 } 1272 }
1281 1273
1282 DCHECK(!callback.is_null()); 1274 DCHECK(!callback.is_null());
1283 1275
1284 callback.Run(); 1276 callback.Run();
1285 } 1277 }
1286 1278
1287 } // namespace edk 1279 } // namespace edk
1288 } // namespace mojo 1280 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/watch_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698