Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(388)

Unified Diff: mojo/edk/system/node_controller.cc

Issue 1675603002: [mojo-edk] Simplify multiprocess pipe bootstrap (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix some callers to work with sync APIs Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/event.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/node_controller.cc
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
index 6410caef5f71213d8757f8228fd884db6c30f0eb..38c53df3b4cfc6d9e98f039e20a36c30fe12a346 100644
--- a/mojo/edk/system/node_controller.cc
+++ b/mojo/edk/system/node_controller.cc
@@ -99,18 +99,6 @@ class ThreadDestructionObserver :
} // namespace
-NodeController::PendingPortRequest::PendingPortRequest() {}
-
-NodeController::PendingPortRequest::~PendingPortRequest() {}
-
-NodeController::ReservedPort::ReservedPort() {}
-
-NodeController::ReservedPort::~ReservedPort() {}
-
-NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {}
-
-NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {}
-
NodeController::~NodeController() {}
NodeController::NodeController(Core* core)
@@ -182,18 +170,25 @@ int NodeController::SendMessage(const ports::PortRef& port,
}
void NodeController::ReservePort(const std::string& token,
- const ReservePortCallback& callback) {
- ports::PortRef port;
- node_->CreateUninitializedPort(&port);
-
+ const ports::PortRef& port) {
DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
<< token;
base::AutoLock lock(reserved_ports_lock_);
- ReservedPort reservation;
- reservation.local_port = port;
- reservation.callback = callback;
- reserved_ports_.insert(std::make_pair(token, reservation));
+ auto result = reserved_ports_.insert(std::make_pair(token, port));
+ DCHECK(result.second);
+}
+
+void NodeController::MergePortIntoParent(const std::string& token,
+ const ports::PortRef& port) {
+ scoped_refptr<NodeChannel> parent = GetParentChannel();
+ if (parent) {
+ parent->RequestPortMerge(port.name(), token);
+ return;
+ }
+
+ base::AutoLock lock(pending_port_merges_lock_);
+ pending_port_merges_.push_back(std::make_pair(token, port));
}
scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
@@ -210,42 +205,6 @@ scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
return buffer;
}
-void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
- const std::string& token,
- const base::Closure& callback) {
- io_task_runner_->PostTask(
- FROM_HERE,
- base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
- base::Unretained(this), local_port, token, callback));
-}
-
-void NodeController::ConnectToRemotePort(
- const ports::PortRef& local_port,
- const ports::NodeName& remote_node_name,
- const ports::PortName& remote_port_name,
- const base::Closure& callback) {
- if (remote_node_name == name_) {
- // It's possible that two different code paths on the node are trying to
- // bootstrap ports to each other (e.g. in Chrome single-process mode)
- // without being aware of the fact. In this case we can initialize the port
- // immediately (which can fail silently if it's already been initialized by
- // the request on the other side), and invoke |callback|.
- node_->InitializePort(local_port, name_, remote_port_name);
- callback.Run();
- return;
- }
-
- PendingRemotePortConnection connection;
- connection.local_port = local_port;
- connection.remote_node_name = remote_node_name;
- connection.remote_port_name = remote_port_name;
- connection.callback = callback;
- io_task_runner_->PostTask(
- FROM_HERE,
- base::Bind(&NodeController::ConnectToRemotePortOnIOThread,
- base::Unretained(this), connection));
-}
-
void NodeController::RequestShutdown(const base::Closure& callback) {
{
base::AutoLock lock(shutdown_lock_);
@@ -304,46 +263,6 @@ void NodeController::ConnectToParentOnIOThread(
bootstrap_parent_channel_->Start();
}
-void NodeController::RequestParentPortConnectionOnIOThread(
- const ports::PortRef& local_port,
- const std::string& token,
- const base::Closure& callback) {
- DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
-
- scoped_refptr<NodeChannel> parent = GetParentChannel();
- if (!parent) {
- PendingPortRequest request;
- request.token = token;
- request.local_port = local_port;
- request.callback = callback;
- pending_port_requests_.push_back(request);
- return;
- }
-
- pending_parent_port_connections_.insert(
- std::make_pair(local_port.name(), callback));
- parent->RequestPortConnection(local_port.name(), token);
-}
-
-void NodeController::ConnectToRemotePortOnIOThread(
- const PendingRemotePortConnection& connection) {
- scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name);
- if (peer) {
- // It's safe to initialize the port since we already have a channel to its
- // peer. No need to actually send them a message.
- int rv = node_->InitializePort(connection.local_port,
- connection.remote_node_name,
- connection.remote_port_name);
- DCHECK_EQ(rv, ports::OK);
- connection.callback.Run();
- return;
- }
-
- // Save this for later. We'll initialize the port once this peer is added.
- pending_remote_port_connections_[connection.remote_node_name].push_back(
- connection);
-}
-
scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
const ports::NodeName& name) {
base::AutoLock lock(peers_lock_);
@@ -414,19 +333,6 @@ void NodeController::AddPeer(const ports::NodeName& name,
channel->PortsMessage(std::move(pending_messages.front()));
pending_messages.pop();
}
-
- // Complete any pending port connections to this peer.
- auto connections_it = pending_remote_port_connections_.find(name);
- if (connections_it != pending_remote_port_connections_.end()) {
- for (const auto& connection : connections_it->second) {
- int rv = node_->InitializePort(connection.local_port,
- connection.remote_node_name,
- connection.remote_port_name);
- DCHECK_EQ(rv, ports::OK);
- connection.callback.Run();
- }
- pending_remote_port_connections_.erase(connections_it);
- }
}
void NodeController::DropPeer(const ports::NodeName& name) {
@@ -616,6 +522,8 @@ void NodeController::OnAcceptChild(const ports::NodeName& from_node,
// NOTE: The child does not actually add its parent as a peer until
// receiving an AcceptBrokerClient message from the broker. The parent
// will request that said message be sent upon receiving AcceptParent.
+
+ DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
}
void NodeController::OnAcceptParent(const ports::NodeName& from_node,
@@ -763,15 +671,16 @@ void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
io_task_runner_);
AddPeer(broker_name, broker, true /* start_channel */);
}
+
AddPeer(parent_name, parent, false /* start_channel */);
- // Resolve any pending port connections to the parent.
- for (const auto& request : pending_port_requests_) {
- pending_parent_port_connections_.insert(
- std::make_pair(request.local_port.name(), request.callback));
- parent->RequestPortConnection(request.local_port.name(), request.token);
+ {
+ // Complete any port merge requests we have waiting for the parent.
+ base::AutoLock lock(pending_port_merges_lock_);
+ for (const auto& request : pending_port_merges_)
+ parent->RequestPortMerge(request.second.name(), request.first);
+ pending_port_merges_.clear();
}
- pending_port_requests_.clear();
// Feed the broker any pending children of our own.
while (!pending_broker_clients.empty()) {
@@ -824,16 +733,15 @@ void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) {
AttemptShutdownIfRequested();
}
-void NodeController::OnRequestPortConnection(
+void NodeController::OnRequestPortMerge(
const ports::NodeName& from_node,
const ports::PortName& connector_port_name,
const std::string& token) {
DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
- DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token "
+ DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
<< token << " and port " << connector_port_name << "@" << from_node;
- ReservePortCallback callback;
ports::PortRef local_port;
{
base::AutoLock lock(reserved_ports_lock_);
@@ -843,64 +751,12 @@ void NodeController::OnRequestPortConnection(
<< token;
return;
}
- local_port = it->second.local_port;
- callback = it->second.callback;
- reserved_ports_.erase(it);
- }
-
- DCHECK(!callback.is_null());
-
- scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node);
- if (!peer) {
- DVLOG(1) << "Ignoring request to connect to port from unknown node "
- << from_node;
- return;
+ local_port = it->second;
}
- // This reserved port should not have been initialized yet.
- CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
- connector_port_name));
-
- peer->ConnectToPort(local_port.name(), connector_port_name);
- callback.Run(local_port);
-}
-
-void NodeController::OnConnectToPort(
- const ports::NodeName& from_node,
- const ports::PortName& connector_port_name,
- const ports::PortName& connectee_port_name) {
- DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
-
- DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
- << connectee_port_name << " to port " << connector_port_name << "@"
- << from_node;
-
- ports::PortRef connectee_port;
- int rv = node_->GetPort(connectee_port_name, &connectee_port);
- if (rv != ports::OK) {
- DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
- << connectee_port_name;
- return;
- }
-
- // It's OK if this port has already been initialized. This message is only
- // sent by the remote peer to ensure the port is ready before it starts
- // us sending messages to it.
- ports::PortStatus port_status;
- rv = node_->GetStatus(connectee_port, &port_status);
- if (rv == ports::OK) {
- DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
- << connectee_port_name;
- return;
- }
-
- CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
- connector_port_name));
-
- auto it = pending_parent_port_connections_.find(connectee_port_name);
- DCHECK(it != pending_parent_port_connections_.end());
- it->second.Run();
- pending_parent_port_connections_.erase(it);
+ int rv = node_->MergePorts(local_port, from_node, connector_port_name);
+ if (rv != ports::OK)
+ DLOG(ERROR) << "MergePorts failed: " << rv;
}
void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/event.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698