Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(171)

Unified Diff: mojo/edk/system/ports/node.cc

Issue 1675603002: [mojo-edk] Simplify multiprocess pipe bootstrap (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/ports/node.cc
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
index 1718306fc672bab1f7e0660fd10fcaa732cbb0c1..e155fa80dbe9ad3097a5b2c6abe0c1cf42c042c2 100644
--- a/mojo/edk/system/ports/node.cc
+++ b/mojo/edk/system/ports/node.cc
@@ -354,10 +354,39 @@ int Node::AcceptMessage(ScopedMessage message) {
return OnObserveClosure(
header->port_name,
GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
+ case EventType::kMergePort:
+ return OnMergePort(header->port_name,
+ *GetEventData<MergePortEventData>(*message));
}
return OOPS(ERROR_NOT_IMPLEMENTED);
}
+int Node::MergePorts(const PortRef& port_ref,
+ const NodeName& destination_node_name,
+ const PortName& destination_port_name) {
+ Port* port = port_ref.port();
+ {
+ // |ports_lock_| must be held for WillSendPort_Locked below.
+ base::AutoLock ports_lock(ports_lock_);
+ base::AutoLock lock(port->lock);
+
+ DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
+ << " to " << destination_port_name << "@" << destination_node_name;
+
+ // Send the port-to-merge over to the destination node so it can be merged
+ // into the port cycle atomically there.
+ MergePortEventData data;
+ data.new_port_name = port_ref.name();
+ WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
+ &data.new_port_descriptor);
+ delegate_->ForwardMessage(
+ destination_node_name,
+ NewInternalMessage(destination_port_name,
+ EventType::kMergePort, data));
+ }
+ return OK;
+}
+
int Node::LostConnectionToNode(const NodeName& node_name) {
// We can no longer send events to the given node. We also can't expect any
// PortAccepted events.
@@ -509,34 +538,8 @@ int Node::OnPortAccepted(const PortName& port_name) {
<< " pointing to "
<< port->peer_port_name << "@" << port->peer_node_name;
- if (port->state != Port::kBuffering)
- return OOPS(ERROR_PORT_STATE_UNEXPECTED);
-
- port->state = Port::kProxying;
-
- int rv = ForwardMessages_Locked(port.get(), port_name);
- if (rv != OK)
- return rv;
-
- // We may have observed closure before receiving PortAccepted. In that
- // case, we can advance to removing the proxy without sending out an
- // ObserveProxy message. We already know the last expected message, etc.
-
- if (port->remove_proxy_on_last_message) {
- MaybeRemoveProxy_Locked(port.get(), port_name);
-
- // Make sure we propagate closure to our current peer.
- ObserveClosureEventData data;
- data.last_sequence_num = port->last_sequence_num_to_receive;
- delegate_->ForwardMessage(
- port->peer_node_name,
- NewInternalMessage(port->peer_port_name,
- EventType::kObserveClosure, data));
- } else {
- InitiateProxyRemoval_Locked(port.get(), port_name);
- }
+ return BeginProxying_Locked(port.get(), port_name);
}
- return OK;
}
int Node::OnObserveProxy(const PortName& port_name,
@@ -721,6 +724,61 @@ int Node::OnObserveClosure(const PortName& port_name,
return OK;
}
+int Node::OnMergePort(const PortName& port_name,
+ const MergePortEventData& event) {
+ scoped_refptr<Port> port = GetPort(port_name);
+ if (!port)
+ return ERROR_PORT_UNKNOWN;
+
+ DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
+ << port->state << ") merging with proxy " << event.new_port_name
+ << "@" << name_ << " pointing to "
+ << event.new_port_descriptor.peer_port_name << "@"
+ << event.new_port_descriptor.peer_node_name << " referred by "
+ << event.new_port_descriptor.referring_port_name << "@"
+ << event.new_port_descriptor.referring_node_name;
+
+ // Accept the new port. This is now the receiving end of the other port cycle
+ // to be merged with ours.
+ int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
+ if (rv != OK)
+ return rv;
+
+ {
+ // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
+ // needs to hold |ports_lock_|. We also acquire multiple port locks within.
+ base::AutoLock ports_lock(ports_lock_);
+
+ base::AutoLock lock(port->lock);
+
+ if (port->state != Port::kReceiving)
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
darin (slow to review) 2016/02/05 17:46:45 should we make some assertions about the state of
Ken Rockot(use gerrit already) 2016/02/05 19:43:05 done
+ base::AutoLock new_port_lock(new_port->lock);
+
+ // Both ports are locked. Now all we have to do is swap their peer
+ // information and set them up as proxies.
+
+ std::swap(port->peer_node_name, new_port->peer_node_name);
+ std::swap(port->peer_port_name, new_port->peer_port_name);
+
+ port->state = Port::kBuffering;
+ if (port->peer_closed)
+ port->remove_proxy_on_last_message = true;
+
+ new_port->state = Port::kBuffering;
+ if (new_port->peer_closed)
+ new_port->remove_proxy_on_last_message = true;
+
+ rv = BeginProxying_Locked(port.get(), port_name);
+ if (rv != OK)
+ return rv;
darin (slow to review) 2016/02/05 17:46:45 Are we in a bad state potentially if we early retu
Ken Rockot(use gerrit already) 2016/02/05 19:43:05 Yeah, I kinda glossed over this before. Changed it
+
+ return BeginProxying_Locked(new_port.get(), event.new_port_name);
+ }
+}
+
int Node::AddPortWithName(const PortName& port_name,
const scoped_refptr<Port>& port) {
base::AutoLock lock(ports_lock_);
@@ -908,6 +966,40 @@ int Node::WillSendMessage_Locked(Port* port,
return OK;
}
+int Node::BeginProxying_Locked(Port* port, const PortName& port_name) {
+ ports_lock_.AssertAcquired();
+ port->lock.AssertAcquired();
+
+ if (port->state != Port::kBuffering)
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED);
+
+ port->state = Port::kProxying;
+
+ int rv = ForwardMessages_Locked(port, port_name);
+ if (rv != OK)
+ return rv;
+
+ // We may have observed closure while buffering. In that case, we can advance
+ // to removing the proxy without sending out an ObserveProxy message. We
+ // already know the last expected message, etc.
+
+ if (port->remove_proxy_on_last_message) {
+ MaybeRemoveProxy_Locked(port, port_name);
+
+ // Make sure we propagate closure to our current peer.
+ ObserveClosureEventData data;
+ data.last_sequence_num = port->last_sequence_num_to_receive;
+ delegate_->ForwardMessage(
+ port->peer_node_name,
+ NewInternalMessage(port->peer_port_name,
+ EventType::kObserveClosure, data));
+ } else {
+ InitiateProxyRemoval_Locked(port, port_name);
+ }
+
+ return OK;
+}
+
int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
ports_lock_.AssertAcquired();
port->lock.AssertAcquired();

Powered by Google App Engine
This is Rietveld 408576698