Index: mojo/edk/system/ports/node.cc |
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc |
index 1718306fc672bab1f7e0660fd10fcaa732cbb0c1..e155fa80dbe9ad3097a5b2c6abe0c1cf42c042c2 100644 |
--- a/mojo/edk/system/ports/node.cc |
+++ b/mojo/edk/system/ports/node.cc |
@@ -354,10 +354,39 @@ int Node::AcceptMessage(ScopedMessage message) { |
return OnObserveClosure( |
header->port_name, |
GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
+ case EventType::kMergePort: |
+ return OnMergePort(header->port_name, |
+ *GetEventData<MergePortEventData>(*message)); |
} |
return OOPS(ERROR_NOT_IMPLEMENTED); |
} |
+int Node::MergePorts(const PortRef& port_ref, |
+ const NodeName& destination_node_name, |
+ const PortName& destination_port_name) { |
+ Port* port = port_ref.port(); |
+ { |
+ // |ports_lock_| must be held for WillSendPort_Locked below. |
+ base::AutoLock ports_lock(ports_lock_); |
+ base::AutoLock lock(port->lock); |
+ |
+ DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ |
+ << " to " << destination_port_name << "@" << destination_node_name; |
+ |
+ // Send the port-to-merge over to the destination node so it can be merged |
+ // into the port cycle atomically there. |
+ MergePortEventData data; |
+ data.new_port_name = port_ref.name(); |
+ WillSendPort_Locked(port, destination_node_name, &data.new_port_name, |
+ &data.new_port_descriptor); |
+ delegate_->ForwardMessage( |
+ destination_node_name, |
+ NewInternalMessage(destination_port_name, |
+ EventType::kMergePort, data)); |
+ } |
+ return OK; |
+} |
+ |
int Node::LostConnectionToNode(const NodeName& node_name) { |
// We can no longer send events to the given node. We also can't expect any |
// PortAccepted events. |
@@ -509,34 +538,8 @@ int Node::OnPortAccepted(const PortName& port_name) { |
<< " pointing to " |
<< port->peer_port_name << "@" << port->peer_node_name; |
- if (port->state != Port::kBuffering) |
- return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
- |
- port->state = Port::kProxying; |
- |
- int rv = ForwardMessages_Locked(port.get(), port_name); |
- if (rv != OK) |
- return rv; |
- |
- // We may have observed closure before receiving PortAccepted. In that |
- // case, we can advance to removing the proxy without sending out an |
- // ObserveProxy message. We already know the last expected message, etc. |
- |
- if (port->remove_proxy_on_last_message) { |
- MaybeRemoveProxy_Locked(port.get(), port_name); |
- |
- // Make sure we propagate closure to our current peer. |
- ObserveClosureEventData data; |
- data.last_sequence_num = port->last_sequence_num_to_receive; |
- delegate_->ForwardMessage( |
- port->peer_node_name, |
- NewInternalMessage(port->peer_port_name, |
- EventType::kObserveClosure, data)); |
- } else { |
- InitiateProxyRemoval_Locked(port.get(), port_name); |
- } |
+ return BeginProxying_Locked(port.get(), port_name); |
} |
- return OK; |
} |
int Node::OnObserveProxy(const PortName& port_name, |
@@ -721,6 +724,61 @@ int Node::OnObserveClosure(const PortName& port_name, |
return OK; |
} |
+int Node::OnMergePort(const PortName& port_name, |
+ const MergePortEventData& event) { |
+ scoped_refptr<Port> port = GetPort(port_name); |
+ if (!port) |
+ return ERROR_PORT_UNKNOWN; |
+ |
+ DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" |
+ << port->state << ") merging with proxy " << event.new_port_name |
+ << "@" << name_ << " pointing to " |
+ << event.new_port_descriptor.peer_port_name << "@" |
+ << event.new_port_descriptor.peer_node_name << " referred by " |
+ << event.new_port_descriptor.referring_port_name << "@" |
+ << event.new_port_descriptor.referring_node_name; |
+ |
+ // Accept the new port. This is now the receiving end of the other port cycle |
+ // to be merged with ours. |
+ int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); |
+ if (rv != OK) |
+ return rv; |
+ |
+ { |
+ // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn |
+ // needs to hold |ports_lock_|. We also acquire multiple port locks within. |
+ base::AutoLock ports_lock(ports_lock_); |
+ |
+ base::AutoLock lock(port->lock); |
+ |
+ if (port->state != Port::kReceiving) |
+ return ERROR_PORT_STATE_UNEXPECTED; |
+ |
+ scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); |
darin (slow to review)
2016/02/05 17:46:45
should we make some assertions about the state of
Ken Rockot(use gerrit already)
2016/02/05 19:43:05
done
|
+ base::AutoLock new_port_lock(new_port->lock); |
+ |
+ // Both ports are locked. Now all we have to do is swap their peer |
+ // information and set them up as proxies. |
+ |
+ std::swap(port->peer_node_name, new_port->peer_node_name); |
+ std::swap(port->peer_port_name, new_port->peer_port_name); |
+ |
+ port->state = Port::kBuffering; |
+ if (port->peer_closed) |
+ port->remove_proxy_on_last_message = true; |
+ |
+ new_port->state = Port::kBuffering; |
+ if (new_port->peer_closed) |
+ new_port->remove_proxy_on_last_message = true; |
+ |
+ rv = BeginProxying_Locked(port.get(), port_name); |
+ if (rv != OK) |
+ return rv; |
darin (slow to review)
2016/02/05 17:46:45
Are we in a bad state potentially if we early retu
Ken Rockot(use gerrit already)
2016/02/05 19:43:05
Yeah, I kinda glossed over this before. Changed it
|
+ |
+ return BeginProxying_Locked(new_port.get(), event.new_port_name); |
+ } |
+} |
+ |
int Node::AddPortWithName(const PortName& port_name, |
const scoped_refptr<Port>& port) { |
base::AutoLock lock(ports_lock_); |
@@ -908,6 +966,40 @@ int Node::WillSendMessage_Locked(Port* port, |
return OK; |
} |
+int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { |
+ ports_lock_.AssertAcquired(); |
+ port->lock.AssertAcquired(); |
+ |
+ if (port->state != Port::kBuffering) |
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
+ |
+ port->state = Port::kProxying; |
+ |
+ int rv = ForwardMessages_Locked(port, port_name); |
+ if (rv != OK) |
+ return rv; |
+ |
+ // We may have observed closure while buffering. In that case, we can advance |
+ // to removing the proxy without sending out an ObserveProxy message. We |
+ // already know the last expected message, etc. |
+ |
+ if (port->remove_proxy_on_last_message) { |
+ MaybeRemoveProxy_Locked(port, port_name); |
+ |
+ // Make sure we propagate closure to our current peer. |
+ ObserveClosureEventData data; |
+ data.last_sequence_num = port->last_sequence_num_to_receive; |
+ delegate_->ForwardMessage( |
+ port->peer_node_name, |
+ NewInternalMessage(port->peer_port_name, |
+ EventType::kObserveClosure, data)); |
+ } else { |
+ InitiateProxyRemoval_Locked(port, port_name); |
+ } |
+ |
+ return OK; |
+} |
+ |
int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
ports_lock_.AssertAcquired(); |
port->lock.AssertAcquired(); |