| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 633 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 644 while (!messages.empty()) { | 644 while (!messages.empty()) { |
| 645 node_->AcceptMessage(std::move(messages.front())); | 645 node_->AcceptMessage(std::move(messages.front())); |
| 646 messages.pop(); | 646 messages.pop(); |
| 647 } | 647 } |
| 648 } | 648 } |
| 649 AttemptShutdownIfRequested(); | 649 AttemptShutdownIfRequested(); |
| 650 } | 650 } |
| 651 | 651 |
| 652 void NodeController::ProcessIncomingMessages() { | 652 void NodeController::ProcessIncomingMessages() { |
| 653 RequestContext request_context(RequestContext::Source::SYSTEM); | 653 RequestContext request_context(RequestContext::Source::SYSTEM); |
| 654 |
| 655 { |
| 656 base::AutoLock lock(messages_lock_); |
| 657 // Allow a new incoming messages processing task to be posted. This can't be |
| 658 // done after AcceptIncomingMessages() otherwise a message might be missed. |
| 659 // Doing it here may result in at most two tasks existing at the same time; |
| 660 // this running one, and one pending in the task runner. |
| 661 incoming_messages_task_posted_ = false; |
| 662 } |
| 663 |
| 654 AcceptIncomingMessages(); | 664 AcceptIncomingMessages(); |
| 655 } | 665 } |
| 656 | 666 |
| 657 void NodeController::DropAllPeers() { | 667 void NodeController::DropAllPeers() { |
| 658 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 668 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 659 | 669 |
| 660 std::vector<scoped_refptr<NodeChannel>> all_peers; | 670 std::vector<scoped_refptr<NodeChannel>> all_peers; |
| 661 { | 671 { |
| 662 base::AutoLock lock(parent_lock_); | 672 base::AutoLock lock(parent_lock_); |
| 663 if (bootstrap_parent_channel_) { | 673 if (bootstrap_parent_channel_) { |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 701 void NodeController::ForwardMessage(const ports::NodeName& node, | 711 void NodeController::ForwardMessage(const ports::NodeName& node, |
| 702 ports::ScopedMessage message) { | 712 ports::ScopedMessage message) { |
| 703 DCHECK(message); | 713 DCHECK(message); |
| 704 bool schedule_pump_task = false; | 714 bool schedule_pump_task = false; |
| 705 if (node == name_) { | 715 if (node == name_) { |
| 706 // NOTE: We need to avoid re-entering the Node instance within | 716 // NOTE: We need to avoid re-entering the Node instance within |
| 707 // ForwardMessage. Because ForwardMessage is only ever called | 717 // ForwardMessage. Because ForwardMessage is only ever called |
| 708 // (synchronously) in response to Node's ClosePort, SendMessage, or | 718 // (synchronously) in response to Node's ClosePort, SendMessage, or |
| 709 // AcceptMessage, we flush the queue after calling any of those methods. | 719 // AcceptMessage, we flush the queue after calling any of those methods. |
| 710 base::AutoLock lock(messages_lock_); | 720 base::AutoLock lock(messages_lock_); |
| 711 schedule_pump_task = incoming_messages_.empty(); | 721 // |io_task_runner_| may be null in tests or processes that don't require |
| 722 // multi-process Mojo. |
| 723 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ && |
| 724 !incoming_messages_task_posted_; |
| 725 incoming_messages_task_posted_ |= schedule_pump_task; |
| 712 incoming_messages_.emplace(std::move(message)); | 726 incoming_messages_.emplace(std::move(message)); |
| 713 incoming_messages_flag_.Set(true); | 727 incoming_messages_flag_.Set(true); |
| 714 } else { | 728 } else { |
| 715 SendPeerMessage(node, std::move(message)); | 729 SendPeerMessage(node, std::move(message)); |
| 716 } | 730 } |
| 717 | 731 |
| 718 // |io_task_runner_| may be null in tests or processes that don't require | 732 if (schedule_pump_task) { |
| 719 // multi-process Mojo. | |
| 720 if (schedule_pump_task && io_task_runner_) { | |
| 721 // Normally, the queue is processed after the action that added the local | 733 // Normally, the queue is processed after the action that added the local |
| 722 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also | 734 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also |
| 723 // possible for a local message to be added as a result of a remote message, | 735 // possible for a local message to be added as a result of a remote message, |
| 724 // and OnChannelMessage() doesn't process this queue (although | 736 // and OnChannelMessage() doesn't process this queue (although |
| 725 // OnPortsMessage() does). There may also be other code paths, now or added | 737 // OnPortsMessage() does). There may also be other code paths, now or added |
| 726 // in the future, which cause local messages to be added but don't process | 738 // in the future, which cause local messages to be added but don't process |
| 727 // this message queue. | 739 // this message queue. |
| 728 // | 740 // |
| 729 // Instead of adding a call to AcceptIncomingMessages() on every possible | 741 // Instead of adding a call to AcceptIncomingMessages() on every possible |
| 730 // code path, post a task to the IO thread to process the queue. If the | 742 // code path, post a task to the IO thread to process the queue. If the |
| (...skipping 506 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1237 shutdown_callback_flag_.Set(false); | 1249 shutdown_callback_flag_.Set(false); |
| 1238 } | 1250 } |
| 1239 | 1251 |
| 1240 DCHECK(!callback.is_null()); | 1252 DCHECK(!callback.is_null()); |
| 1241 | 1253 |
| 1242 callback.Run(); | 1254 callback.Run(); |
| 1243 } | 1255 } |
| 1244 | 1256 |
| 1245 } // namespace edk | 1257 } // namespace edk |
| 1246 } // namespace mojo | 1258 } // namespace mojo |
| OLD | NEW |