Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2138343002: [mojo-edk] Ensure there is only one ProcessIncomingMessages() task posted to the IO thread. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Comment changes. Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 633 matching lines...) Expand 10 before | Expand all | Expand 10 after
644 while (!messages.empty()) { 644 while (!messages.empty()) {
645 node_->AcceptMessage(std::move(messages.front())); 645 node_->AcceptMessage(std::move(messages.front()));
646 messages.pop(); 646 messages.pop();
647 } 647 }
648 } 648 }
649 AttemptShutdownIfRequested(); 649 AttemptShutdownIfRequested();
650 } 650 }
651 651
652 void NodeController::ProcessIncomingMessages() { 652 void NodeController::ProcessIncomingMessages() {
653 RequestContext request_context(RequestContext::Source::SYSTEM); 653 RequestContext request_context(RequestContext::Source::SYSTEM);
654
655 {
656 base::AutoLock lock(messages_lock_);
657 // Allow a new incoming messages processing task to be posted. This can't be
658 // done after AcceptIncomingMessages() otherwise a message might be missed.
659 // Doing it here may result in at most two tasks existing at the same time;
660 // this running one, and one pending in the task runner.
661 incoming_messages_task_posted_ = false;
662 }
663
654 AcceptIncomingMessages(); 664 AcceptIncomingMessages();
655 } 665 }
656 666
657 void NodeController::DropAllPeers() { 667 void NodeController::DropAllPeers() {
658 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 668 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
659 669
660 std::vector<scoped_refptr<NodeChannel>> all_peers; 670 std::vector<scoped_refptr<NodeChannel>> all_peers;
661 { 671 {
662 base::AutoLock lock(parent_lock_); 672 base::AutoLock lock(parent_lock_);
663 if (bootstrap_parent_channel_) { 673 if (bootstrap_parent_channel_) {
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
701 void NodeController::ForwardMessage(const ports::NodeName& node, 711 void NodeController::ForwardMessage(const ports::NodeName& node,
702 ports::ScopedMessage message) { 712 ports::ScopedMessage message) {
703 DCHECK(message); 713 DCHECK(message);
704 bool schedule_pump_task = false; 714 bool schedule_pump_task = false;
705 if (node == name_) { 715 if (node == name_) {
706 // NOTE: We need to avoid re-entering the Node instance within 716 // NOTE: We need to avoid re-entering the Node instance within
707 // ForwardMessage. Because ForwardMessage is only ever called 717 // ForwardMessage. Because ForwardMessage is only ever called
708 // (synchronously) in response to Node's ClosePort, SendMessage, or 718 // (synchronously) in response to Node's ClosePort, SendMessage, or
709 // AcceptMessage, we flush the queue after calling any of those methods. 719 // AcceptMessage, we flush the queue after calling any of those methods.
710 base::AutoLock lock(messages_lock_); 720 base::AutoLock lock(messages_lock_);
711 schedule_pump_task = incoming_messages_.empty(); 721 // |io_task_runner_| may be null in tests or processes that don't require
722 // multi-process Mojo.
723 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
724 !incoming_messages_task_posted_;
725 incoming_messages_task_posted_ |= schedule_pump_task;
712 incoming_messages_.emplace(std::move(message)); 726 incoming_messages_.emplace(std::move(message));
713 incoming_messages_flag_.Set(true); 727 incoming_messages_flag_.Set(true);
714 } else { 728 } else {
715 SendPeerMessage(node, std::move(message)); 729 SendPeerMessage(node, std::move(message));
716 } 730 }
717 731
718 // |io_task_runner_| may be null in tests or processes that don't require 732 if (schedule_pump_task) {
719 // multi-process Mojo.
720 if (schedule_pump_task && io_task_runner_) {
721 // Normally, the queue is processed after the action that added the local 733 // Normally, the queue is processed after the action that added the local
722 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also 734 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
723 // possible for a local message to be added as a result of a remote message, 735 // possible for a local message to be added as a result of a remote message,
724 // and OnChannelMessage() doesn't process this queue (although 736 // and OnChannelMessage() doesn't process this queue (although
725 // OnPortsMessage() does). There may also be other code paths, now or added 737 // OnPortsMessage() does). There may also be other code paths, now or added
726 // in the future, which cause local messages to be added but don't process 738 // in the future, which cause local messages to be added but don't process
727 // this message queue. 739 // this message queue.
728 // 740 //
729 // Instead of adding a call to AcceptIncomingMessages() on every possible 741 // Instead of adding a call to AcceptIncomingMessages() on every possible
730 // code path, post a task to the IO thread to process the queue. If the 742 // code path, post a task to the IO thread to process the queue. If the
(...skipping 506 matching lines...) Expand 10 before | Expand all | Expand 10 after
1237 shutdown_callback_flag_.Set(false); 1249 shutdown_callback_flag_.Set(false);
1238 } 1250 }
1239 1251
1240 DCHECK(!callback.is_null()); 1252 DCHECK(!callback.is_null());
1241 1253
1242 callback.Run(); 1254 callback.Run();
1243 } 1255 }
1244 1256
1245 } // namespace edk 1257 } // namespace edk
1246 } // namespace mojo 1258 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698