| Index: mojo/edk/system/node_controller.cc
|
| diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
|
| index 6410caef5f71213d8757f8228fd884db6c30f0eb..38c53df3b4cfc6d9e98f039e20a36c30fe12a346 100644
|
| --- a/mojo/edk/system/node_controller.cc
|
| +++ b/mojo/edk/system/node_controller.cc
|
| @@ -99,18 +99,6 @@ class ThreadDestructionObserver :
|
|
|
| } // namespace
|
|
|
| -NodeController::PendingPortRequest::PendingPortRequest() {}
|
| -
|
| -NodeController::PendingPortRequest::~PendingPortRequest() {}
|
| -
|
| -NodeController::ReservedPort::ReservedPort() {}
|
| -
|
| -NodeController::ReservedPort::~ReservedPort() {}
|
| -
|
| -NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {}
|
| -
|
| -NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {}
|
| -
|
| NodeController::~NodeController() {}
|
|
|
| NodeController::NodeController(Core* core)
|
| @@ -182,18 +170,25 @@ int NodeController::SendMessage(const ports::PortRef& port,
|
| }
|
|
|
| void NodeController::ReservePort(const std::string& token,
|
| - const ReservePortCallback& callback) {
|
| - ports::PortRef port;
|
| - node_->CreateUninitializedPort(&port);
|
| -
|
| + const ports::PortRef& port) {
|
| DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
|
| << token;
|
|
|
| base::AutoLock lock(reserved_ports_lock_);
|
| - ReservedPort reservation;
|
| - reservation.local_port = port;
|
| - reservation.callback = callback;
|
| - reserved_ports_.insert(std::make_pair(token, reservation));
|
| + auto result = reserved_ports_.insert(std::make_pair(token, port));
|
| + DCHECK(result.second);
|
| +}
|
| +
|
| +void NodeController::MergePortIntoParent(const std::string& token,
|
| + const ports::PortRef& port) {
|
| + scoped_refptr<NodeChannel> parent = GetParentChannel();
|
| + if (parent) {
|
| + parent->RequestPortMerge(port.name(), token);
|
| + return;
|
| + }
|
| +
|
| + base::AutoLock lock(pending_port_merges_lock_);
|
| + pending_port_merges_.push_back(std::make_pair(token, port));
|
| }
|
|
|
| scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
|
| @@ -210,42 +205,6 @@ scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
|
| return buffer;
|
| }
|
|
|
| -void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
|
| - const std::string& token,
|
| - const base::Closure& callback) {
|
| - io_task_runner_->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
|
| - base::Unretained(this), local_port, token, callback));
|
| -}
|
| -
|
| -void NodeController::ConnectToRemotePort(
|
| - const ports::PortRef& local_port,
|
| - const ports::NodeName& remote_node_name,
|
| - const ports::PortName& remote_port_name,
|
| - const base::Closure& callback) {
|
| - if (remote_node_name == name_) {
|
| - // It's possible that two different code paths on the node are trying to
|
| - // bootstrap ports to each other (e.g. in Chrome single-process mode)
|
| - // without being aware of the fact. In this case we can initialize the port
|
| - // immediately (which can fail silently if it's already been initialized by
|
| - // the request on the other side), and invoke |callback|.
|
| - node_->InitializePort(local_port, name_, remote_port_name);
|
| - callback.Run();
|
| - return;
|
| - }
|
| -
|
| - PendingRemotePortConnection connection;
|
| - connection.local_port = local_port;
|
| - connection.remote_node_name = remote_node_name;
|
| - connection.remote_port_name = remote_port_name;
|
| - connection.callback = callback;
|
| - io_task_runner_->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&NodeController::ConnectToRemotePortOnIOThread,
|
| - base::Unretained(this), connection));
|
| -}
|
| -
|
| void NodeController::RequestShutdown(const base::Closure& callback) {
|
| {
|
| base::AutoLock lock(shutdown_lock_);
|
| @@ -304,46 +263,6 @@ void NodeController::ConnectToParentOnIOThread(
|
| bootstrap_parent_channel_->Start();
|
| }
|
|
|
| -void NodeController::RequestParentPortConnectionOnIOThread(
|
| - const ports::PortRef& local_port,
|
| - const std::string& token,
|
| - const base::Closure& callback) {
|
| - DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| -
|
| - scoped_refptr<NodeChannel> parent = GetParentChannel();
|
| - if (!parent) {
|
| - PendingPortRequest request;
|
| - request.token = token;
|
| - request.local_port = local_port;
|
| - request.callback = callback;
|
| - pending_port_requests_.push_back(request);
|
| - return;
|
| - }
|
| -
|
| - pending_parent_port_connections_.insert(
|
| - std::make_pair(local_port.name(), callback));
|
| - parent->RequestPortConnection(local_port.name(), token);
|
| -}
|
| -
|
| -void NodeController::ConnectToRemotePortOnIOThread(
|
| - const PendingRemotePortConnection& connection) {
|
| - scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name);
|
| - if (peer) {
|
| - // It's safe to initialize the port since we already have a channel to its
|
| - // peer. No need to actually send them a message.
|
| - int rv = node_->InitializePort(connection.local_port,
|
| - connection.remote_node_name,
|
| - connection.remote_port_name);
|
| - DCHECK_EQ(rv, ports::OK);
|
| - connection.callback.Run();
|
| - return;
|
| - }
|
| -
|
| - // Save this for later. We'll initialize the port once this peer is added.
|
| - pending_remote_port_connections_[connection.remote_node_name].push_back(
|
| - connection);
|
| -}
|
| -
|
| scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
|
| const ports::NodeName& name) {
|
| base::AutoLock lock(peers_lock_);
|
| @@ -414,19 +333,6 @@ void NodeController::AddPeer(const ports::NodeName& name,
|
| channel->PortsMessage(std::move(pending_messages.front()));
|
| pending_messages.pop();
|
| }
|
| -
|
| - // Complete any pending port connections to this peer.
|
| - auto connections_it = pending_remote_port_connections_.find(name);
|
| - if (connections_it != pending_remote_port_connections_.end()) {
|
| - for (const auto& connection : connections_it->second) {
|
| - int rv = node_->InitializePort(connection.local_port,
|
| - connection.remote_node_name,
|
| - connection.remote_port_name);
|
| - DCHECK_EQ(rv, ports::OK);
|
| - connection.callback.Run();
|
| - }
|
| - pending_remote_port_connections_.erase(connections_it);
|
| - }
|
| }
|
|
|
| void NodeController::DropPeer(const ports::NodeName& name) {
|
| @@ -616,6 +522,8 @@ void NodeController::OnAcceptChild(const ports::NodeName& from_node,
|
| // NOTE: The child does not actually add its parent as a peer until
|
| // receiving an AcceptBrokerClient message from the broker. The parent
|
| // will request that said message be sent upon receiving AcceptParent.
|
| +
|
| + DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
|
| }
|
|
|
| void NodeController::OnAcceptParent(const ports::NodeName& from_node,
|
| @@ -763,15 +671,16 @@ void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
|
| io_task_runner_);
|
| AddPeer(broker_name, broker, true /* start_channel */);
|
| }
|
| +
|
| AddPeer(parent_name, parent, false /* start_channel */);
|
|
|
| - // Resolve any pending port connections to the parent.
|
| - for (const auto& request : pending_port_requests_) {
|
| - pending_parent_port_connections_.insert(
|
| - std::make_pair(request.local_port.name(), request.callback));
|
| - parent->RequestPortConnection(request.local_port.name(), request.token);
|
| + {
|
| + // Complete any port merge requests we have waiting for the parent.
|
| + base::AutoLock lock(pending_port_merges_lock_);
|
| + for (const auto& request : pending_port_merges_)
|
| + parent->RequestPortMerge(request.second.name(), request.first);
|
| + pending_port_merges_.clear();
|
| }
|
| - pending_port_requests_.clear();
|
|
|
| // Feed the broker any pending children of our own.
|
| while (!pending_broker_clients.empty()) {
|
| @@ -824,16 +733,15 @@ void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) {
|
| AttemptShutdownIfRequested();
|
| }
|
|
|
| -void NodeController::OnRequestPortConnection(
|
| +void NodeController::OnRequestPortMerge(
|
| const ports::NodeName& from_node,
|
| const ports::PortName& connector_port_name,
|
| const std::string& token) {
|
| DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
|
|
| - DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token "
|
| + DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
|
| << token << " and port " << connector_port_name << "@" << from_node;
|
|
|
| - ReservePortCallback callback;
|
| ports::PortRef local_port;
|
| {
|
| base::AutoLock lock(reserved_ports_lock_);
|
| @@ -843,64 +751,12 @@ void NodeController::OnRequestPortConnection(
|
| << token;
|
| return;
|
| }
|
| - local_port = it->second.local_port;
|
| - callback = it->second.callback;
|
| - reserved_ports_.erase(it);
|
| - }
|
| -
|
| - DCHECK(!callback.is_null());
|
| -
|
| - scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node);
|
| - if (!peer) {
|
| - DVLOG(1) << "Ignoring request to connect to port from unknown node "
|
| - << from_node;
|
| - return;
|
| + local_port = it->second;
|
| }
|
|
|
| - // This reserved port should not have been initialized yet.
|
| - CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
|
| - connector_port_name));
|
| -
|
| - peer->ConnectToPort(local_port.name(), connector_port_name);
|
| - callback.Run(local_port);
|
| -}
|
| -
|
| -void NodeController::OnConnectToPort(
|
| - const ports::NodeName& from_node,
|
| - const ports::PortName& connector_port_name,
|
| - const ports::PortName& connectee_port_name) {
|
| - DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| -
|
| - DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
|
| - << connectee_port_name << " to port " << connector_port_name << "@"
|
| - << from_node;
|
| -
|
| - ports::PortRef connectee_port;
|
| - int rv = node_->GetPort(connectee_port_name, &connectee_port);
|
| - if (rv != ports::OK) {
|
| - DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
|
| - << connectee_port_name;
|
| - return;
|
| - }
|
| -
|
| - // It's OK if this port has already been initialized. This message is only
|
| - // sent by the remote peer to ensure the port is ready before it starts
|
| - // us sending messages to it.
|
| - ports::PortStatus port_status;
|
| - rv = node_->GetStatus(connectee_port, &port_status);
|
| - if (rv == ports::OK) {
|
| - DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
|
| - << connectee_port_name;
|
| - return;
|
| - }
|
| -
|
| - CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
|
| - connector_port_name));
|
| -
|
| - auto it = pending_parent_port_connections_.find(connectee_port_name);
|
| - DCHECK(it != pending_parent_port_connections_.end());
|
| - it->second.Run();
|
| - pending_parent_port_connections_.erase(it);
|
| + int rv = node_->MergePorts(local_port, from_node, connector_port_name);
|
| + if (rv != ports::OK)
|
| + DLOG(ERROR) << "MergePorts failed: " << rv;
|
| }
|
|
|
| void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
|
|
|