| Index: mojo/edk/system/node_controller.cc
|
| diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
|
| index 5fc55f557d7063cd50071b398a17f90bdc5d320d..e6ed580b6aa777eb64aa86ad9ec348660baf981f 100644
|
| --- a/mojo/edk/system/node_controller.cc
|
| +++ b/mojo/edk/system/node_controller.cc
|
| @@ -23,6 +23,7 @@
|
| #include "mojo/edk/system/broker_host.h"
|
| #include "mojo/edk/system/core.h"
|
| #include "mojo/edk/system/ports_message.h"
|
| +#include "mojo/edk/system/request_context.h"
|
|
|
| #if defined(OS_MACOSX) && !defined(OS_IOS)
|
| #include "mojo/edk/system/mach_port_relay.h"
|
| @@ -646,6 +647,11 @@ void NodeController::AcceptIncomingMessages() {
|
| AttemptShutdownIfRequested();
|
| }
|
|
|
| +void NodeController::ProcessIncomingMessages() {
|
| + RequestContext request_context(RequestContext::Source::SYSTEM);
|
| + AcceptIncomingMessages();
|
| +}
|
| +
|
| void NodeController::DropAllPeers() {
|
| DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
|
|
| @@ -693,17 +699,39 @@ void NodeController::AllocMessage(size_t num_header_bytes,
|
| void NodeController::ForwardMessage(const ports::NodeName& node,
|
| ports::ScopedMessage message) {
|
| DCHECK(message);
|
| + bool schedule_pump_task = false;
|
| if (node == name_) {
|
| // NOTE: We need to avoid re-entering the Node instance within
|
| // ForwardMessage. Because ForwardMessage is only ever called
|
| // (synchronously) in response to Node's ClosePort, SendMessage, or
|
| // AcceptMessage, we flush the queue after calling any of those methods.
|
| base::AutoLock lock(messages_lock_);
|
| + schedule_pump_task = incoming_messages_.empty();
|
| incoming_messages_.emplace(std::move(message));
|
| incoming_messages_flag_.Set(true);
|
| } else {
|
| SendPeerMessage(node, std::move(message));
|
| }
|
| +
|
| + // |io_task_runner_| may be null in tests or processes that don't require
|
| + // multi-process Mojo.
|
| + if (schedule_pump_task && io_task_runner_) {
|
| + // Normally, the queue is processed after the action that added the local
|
| + // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
|
| + // possible for a local message to be added as a result of a remote message,
|
| + // and OnChannelMessage() doesn't process this queue (although
|
| + // OnPortsMessage() does). There may also be other code paths, now or added
|
| + // in the future, which cause local messages to be added but don't process
|
| + // this message queue.
|
| + //
|
| + // Instead of adding a call to AcceptIncomingMessages() on every possible
|
| + // code path, post a task to the IO thread to process the queue. If the
|
| + // current call stack processes the queue, this may end up doing nothing.
|
| + io_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&NodeController::ProcessIncomingMessages,
|
| + base::Unretained(this)));
|
| + }
|
| }
|
|
|
| void NodeController::BroadcastMessage(ports::ScopedMessage message) {
|
|
|