Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(612)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2219733005: [mojo-edk] Revert ObserveProxy retransmission behavior (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/node.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 634 matching lines...) Expand 10 before | Expand all | Expand 10 after
645 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 645 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
646 if (!broker) { 646 if (!broker) {
647 DVLOG(1) << "Dropping message for unknown peer: " << name; 647 DVLOG(1) << "Dropping message for unknown peer: " << name;
648 return; 648 return;
649 } 649 }
650 broker->RequestIntroduction(name); 650 broker->RequestIntroduction(name);
651 } 651 }
652 } 652 }
653 653
654 void NodeController::AcceptIncomingMessages() { 654 void NodeController::AcceptIncomingMessages() {
655 { 655 while (incoming_messages_flag_) {
656 base::AutoLock lock(messages_lock_); 656 // TODO: We may need to be more careful to avoid starving the rest of the
657 if (!incoming_messages_.empty()) { 657 // thread here. Revisit this if it turns out to be a problem. One
658 // libstdc++'s deque creates an internal buffer on construction, even when 658 // alternative would be to schedule a task to continue pumping messages
659 // the size is 0. So avoid creating it until it is necessary. 659 // after flushing once.
660 std::queue<ports::ScopedMessage> messages;
661 std::swap(messages, incoming_messages_);
662 base::AutoUnlock unlock(messages_lock_);
663 660
664 while (!messages.empty()) { 661 messages_lock_.Acquire();
665 node_->AcceptMessage(std::move(messages.front())); 662 if (incoming_messages_.empty()) {
666 messages.pop(); 663 messages_lock_.Release();
667 } 664 break;
665 }
666
667 // libstdc++'s deque creates an internal buffer on construction, even when
668 // the size is 0. So avoid creating it until it is necessary.
669 std::queue<ports::ScopedMessage> messages;
670 std::swap(messages, incoming_messages_);
671 incoming_messages_flag_.Set(false);
672 messages_lock_.Release();
673
674 while (!messages.empty()) {
675 node_->AcceptMessage(std::move(messages.front()));
676 messages.pop();
668 } 677 }
669 } 678 }
670 679
671 AttemptShutdownIfRequested(); 680 AttemptShutdownIfRequested();
672 } 681 }
673 682
674 void NodeController::ProcessIncomingMessages() { 683 void NodeController::ProcessIncomingMessages() {
675 RequestContext request_context(RequestContext::Source::SYSTEM); 684 RequestContext request_context(RequestContext::Source::SYSTEM);
676 685
677 { 686 {
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
739 // ForwardMessage. Because ForwardMessage is only ever called 748 // ForwardMessage. Because ForwardMessage is only ever called
740 // (synchronously) in response to Node's ClosePort, SendMessage, or 749 // (synchronously) in response to Node's ClosePort, SendMessage, or
741 // AcceptMessage, we flush the queue after calling any of those methods. 750 // AcceptMessage, we flush the queue after calling any of those methods.
742 base::AutoLock lock(messages_lock_); 751 base::AutoLock lock(messages_lock_);
743 // |io_task_runner_| may be null in tests or processes that don't require 752 // |io_task_runner_| may be null in tests or processes that don't require
744 // multi-process Mojo. 753 // multi-process Mojo.
745 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ && 754 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
746 !incoming_messages_task_posted_; 755 !incoming_messages_task_posted_;
747 incoming_messages_task_posted_ |= schedule_pump_task; 756 incoming_messages_task_posted_ |= schedule_pump_task;
748 incoming_messages_.emplace(std::move(message)); 757 incoming_messages_.emplace(std::move(message));
758 incoming_messages_flag_.Set(true);
749 } else { 759 } else {
750 SendPeerMessage(node, std::move(message)); 760 SendPeerMessage(node, std::move(message));
751 } 761 }
752 762
753 if (schedule_pump_task) { 763 if (schedule_pump_task) {
754 // Normally, the queue is processed after the action that added the local 764 // Normally, the queue is processed after the action that added the local
755 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also 765 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
756 // possible for a local message to be added as a result of a remote message, 766 // possible for a local message to be added as a result of a remote message,
757 // and OnChannelMessage() doesn't process this queue (although 767 // and OnChannelMessage() doesn't process this queue (although
758 // OnPortsMessage() does). There may also be other code paths, now or added 768 // OnPortsMessage() does). There may also be other code paths, now or added
(...skipping 495 matching lines...) Expand 10 before | Expand all | Expand 10 after
1254 1264
1255 void NodeController::AttemptShutdownIfRequested() { 1265 void NodeController::AttemptShutdownIfRequested() {
1256 if (!shutdown_callback_flag_) 1266 if (!shutdown_callback_flag_)
1257 return; 1267 return;
1258 1268
1259 base::Closure callback; 1269 base::Closure callback;
1260 { 1270 {
1261 base::AutoLock lock(shutdown_lock_); 1271 base::AutoLock lock(shutdown_lock_);
1262 if (shutdown_callback_.is_null()) 1272 if (shutdown_callback_.is_null())
1263 return; 1273 return;
1264 if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) { 1274 if (!node_->CanShutdownCleanly(
1275 ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) {
1265 DVLOG(2) << "Unable to cleanly shut down node " << name_; 1276 DVLOG(2) << "Unable to cleanly shut down node " << name_;
1266 return; 1277 return;
1267 } 1278 }
1268 1279
1269 callback = shutdown_callback_; 1280 callback = shutdown_callback_;
1270 shutdown_callback_.Reset(); 1281 shutdown_callback_.Reset();
1271 shutdown_callback_flag_.Set(false); 1282 shutdown_callback_flag_.Set(false);
1272 } 1283 }
1273 1284
1274 DCHECK(!callback.is_null()); 1285 DCHECK(!callback.is_null());
1275 1286
1276 callback.Run(); 1287 callback.Run();
1277 } 1288 }
1278 1289
1279 } // namespace edk 1290 } // namespace edk
1280 } // namespace mojo 1291 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/node.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698