Index: mojo/edk/system/ports/node.cc |
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc |
index 9cfc51131dfb536cd1aada3bb28816b44b565245..1718306fc672bab1f7e0660fd10fcaa732cbb0c1 100644 |
--- a/mojo/edk/system/ports/node.cc |
+++ b/mojo/edk/system/ports/node.cc |
@@ -354,37 +354,8 @@ |
return OnObserveClosure( |
header->port_name, |
GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
- case EventType::kMergePort: |
- return OnMergePort(header->port_name, |
- *GetEventData<MergePortEventData>(*message)); |
} |
return OOPS(ERROR_NOT_IMPLEMENTED); |
-} |
- |
-int Node::MergePorts(const PortRef& port_ref, |
- const NodeName& destination_node_name, |
- const PortName& destination_port_name) { |
- Port* port = port_ref.port(); |
- { |
- // |ports_lock_| must be held for WillSendPort_Locked below. |
- base::AutoLock ports_lock(ports_lock_); |
- base::AutoLock lock(port->lock); |
- |
- DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ |
- << " to " << destination_port_name << "@" << destination_node_name; |
- |
- // Send the port-to-merge over to the destination node so it can be merged |
- // into the port cycle atomically there. |
- MergePortEventData data; |
- data.new_port_name = port_ref.name(); |
- WillSendPort_Locked(port, destination_node_name, &data.new_port_name, |
- &data.new_port_descriptor); |
- delegate_->ForwardMessage( |
- destination_node_name, |
- NewInternalMessage(destination_port_name, |
- EventType::kMergePort, data)); |
- } |
- return OK; |
} |
int Node::LostConnectionToNode(const NodeName& node_name) { |
@@ -538,8 +509,34 @@ |
<< " pointing to " |
<< port->peer_port_name << "@" << port->peer_node_name; |
- return BeginProxying_Locked(port.get(), port_name); |
- } |
+ if (port->state != Port::kBuffering) |
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
+ |
+ port->state = Port::kProxying; |
+ |
+ int rv = ForwardMessages_Locked(port.get(), port_name); |
+ if (rv != OK) |
+ return rv; |
+ |
+ // We may have observed closure before receiving PortAccepted. In that |
+ // case, we can advance to removing the proxy without sending out an |
+ // ObserveProxy message. We already know the last expected message, etc. |
+ |
+ if (port->remove_proxy_on_last_message) { |
+ MaybeRemoveProxy_Locked(port.get(), port_name); |
+ |
+ // Make sure we propagate closure to our current peer. |
+ ObserveClosureEventData data; |
+ data.last_sequence_num = port->last_sequence_num_to_receive; |
+ delegate_->ForwardMessage( |
+ port->peer_node_name, |
+ NewInternalMessage(port->peer_port_name, |
+ EventType::kObserveClosure, data)); |
+ } else { |
+ InitiateProxyRemoval_Locked(port.get(), port_name); |
+ } |
+ } |
+ return OK; |
} |
int Node::OnObserveProxy(const PortName& port_name, |
@@ -724,94 +721,6 @@ |
return OK; |
} |
-int Node::OnMergePort(const PortName& port_name, |
- const MergePortEventData& event) { |
- scoped_refptr<Port> port = GetPort(port_name); |
- if (!port) |
- return ERROR_PORT_UNKNOWN; |
- |
- DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" |
- << port->state << ") merging with proxy " << event.new_port_name |
- << "@" << name_ << " pointing to " |
- << event.new_port_descriptor.peer_port_name << "@" |
- << event.new_port_descriptor.peer_node_name << " referred by " |
- << event.new_port_descriptor.referring_port_name << "@" |
- << event.new_port_descriptor.referring_node_name; |
- |
- bool close_target_port = false; |
- bool close_new_port = false; |
- |
- // Accept the new port. This is now the receiving end of the other port cycle |
- // to be merged with ours. |
- int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); |
- if (rv != OK) { |
- close_target_port = true; |
- } else { |
- // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn |
- // needs to hold |ports_lock_|. We also acquire multiple port locks within. |
- base::AutoLock ports_lock(ports_lock_); |
- base::AutoLock lock(port->lock); |
- |
- if (port->state != Port::kReceiving) { |
- close_new_port = true; |
- } else { |
- scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); |
- base::AutoLock new_port_lock(new_port->lock); |
- DCHECK(new_port->state == Port::kReceiving); |
- |
- // Both ports are locked. Now all we have to do is swap their peer |
- // information and set them up as proxies. |
- |
- std::swap(port->peer_node_name, new_port->peer_node_name); |
- std::swap(port->peer_port_name, new_port->peer_port_name); |
- std::swap(port->peer_closed, new_port->peer_closed); |
- |
- port->state = Port::kBuffering; |
- if (port->peer_closed) |
- port->remove_proxy_on_last_message = true; |
- |
- new_port->state = Port::kBuffering; |
- if (new_port->peer_closed) |
- new_port->remove_proxy_on_last_message = true; |
- |
- int rv1 = BeginProxying_Locked(port.get(), port_name); |
- int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); |
- |
- if (rv1 == OK && rv2 == OK) |
- return OK; |
- |
- // If either proxy failed to initialize (e.g. had undeliverable messages |
- // or ended up in a bad state somehow), we keep the system in a consistent |
- // state by undoing the peer swap and closing both merge ports. |
- |
- std::swap(port->peer_node_name, new_port->peer_node_name); |
- std::swap(port->peer_port_name, new_port->peer_port_name); |
- port->state = Port::kReceiving; |
- new_port->state = Port::kReceiving; |
- close_new_port = true; |
- close_target_port = true; |
- } |
- } |
- |
- if (close_target_port) { |
- PortRef target_port; |
- rv = GetPort(port_name, &target_port); |
- DCHECK(rv == OK); |
- |
- ClosePort(target_port); |
- } |
- |
- if (close_new_port) { |
- PortRef new_port; |
- rv = GetPort(event.new_port_name, &new_port); |
- DCHECK(rv == OK); |
- |
- ClosePort(new_port); |
- } |
- |
- return ERROR_PORT_STATE_UNEXPECTED; |
-} |
- |
int Node::AddPortWithName(const PortName& port_name, |
const scoped_refptr<Port>& port) { |
base::AutoLock lock(ports_lock_); |
@@ -999,40 +908,6 @@ |
return OK; |
} |
-int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { |
- ports_lock_.AssertAcquired(); |
- port->lock.AssertAcquired(); |
- |
- if (port->state != Port::kBuffering) |
- return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
- |
- port->state = Port::kProxying; |
- |
- int rv = ForwardMessages_Locked(port, port_name); |
- if (rv != OK) |
- return rv; |
- |
- // We may have observed closure while buffering. In that case, we can advance |
- // to removing the proxy without sending out an ObserveProxy message. We |
- // already know the last expected message, etc. |
- |
- if (port->remove_proxy_on_last_message) { |
- MaybeRemoveProxy_Locked(port, port_name); |
- |
- // Make sure we propagate closure to our current peer. |
- ObserveClosureEventData data; |
- data.last_sequence_num = port->last_sequence_num_to_receive; |
- delegate_->ForwardMessage( |
- port->peer_node_name, |
- NewInternalMessage(port->peer_port_name, |
- EventType::kObserveClosure, data)); |
- } else { |
- InitiateProxyRemoval_Locked(port, port_name); |
- } |
- |
- return OK; |
-} |
- |
int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
ports_lock_.AssertAcquired(); |
port->lock.AssertAcquired(); |