Chromium Code Reviews| Index: mojo/edk/system/ports/node.cc |
| diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc |
| index 1718306fc672bab1f7e0660fd10fcaa732cbb0c1..e155fa80dbe9ad3097a5b2c6abe0c1cf42c042c2 100644 |
| --- a/mojo/edk/system/ports/node.cc |
| +++ b/mojo/edk/system/ports/node.cc |
| @@ -354,10 +354,39 @@ int Node::AcceptMessage(ScopedMessage message) { |
| return OnObserveClosure( |
| header->port_name, |
| GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); |
| + case EventType::kMergePort: |
| + return OnMergePort(header->port_name, |
| + *GetEventData<MergePortEventData>(*message)); |
| } |
| return OOPS(ERROR_NOT_IMPLEMENTED); |
| } |
| +int Node::MergePorts(const PortRef& port_ref, |
| + const NodeName& destination_node_name, |
| + const PortName& destination_port_name) { |
| + Port* port = port_ref.port(); |
| + { |
| + // |ports_lock_| must be held for WillSendPort_Locked below. |
| + base::AutoLock ports_lock(ports_lock_); |
| + base::AutoLock lock(port->lock); |
| + |
| + DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ |
| + << " to " << destination_port_name << "@" << destination_node_name; |
| + |
| + // Send the port-to-merge over to the destination node so it can be merged |
| + // into the port cycle atomically there. |
| + MergePortEventData data; |
| + data.new_port_name = port_ref.name(); |
| + WillSendPort_Locked(port, destination_node_name, &data.new_port_name, |
| + &data.new_port_descriptor); |
| + delegate_->ForwardMessage( |
| + destination_node_name, |
| + NewInternalMessage(destination_port_name, |
| + EventType::kMergePort, data)); |
| + } |
| + return OK; |
| +} |
| + |
| int Node::LostConnectionToNode(const NodeName& node_name) { |
| // We can no longer send events to the given node. We also can't expect any |
| // PortAccepted events. |
| @@ -509,34 +538,8 @@ int Node::OnPortAccepted(const PortName& port_name) { |
| << " pointing to " |
| << port->peer_port_name << "@" << port->peer_node_name; |
| - if (port->state != Port::kBuffering) |
| - return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
| - |
| - port->state = Port::kProxying; |
| - |
| - int rv = ForwardMessages_Locked(port.get(), port_name); |
| - if (rv != OK) |
| - return rv; |
| - |
| - // We may have observed closure before receiving PortAccepted. In that |
| - // case, we can advance to removing the proxy without sending out an |
| - // ObserveProxy message. We already know the last expected message, etc. |
| - |
| - if (port->remove_proxy_on_last_message) { |
| - MaybeRemoveProxy_Locked(port.get(), port_name); |
| - |
| - // Make sure we propagate closure to our current peer. |
| - ObserveClosureEventData data; |
| - data.last_sequence_num = port->last_sequence_num_to_receive; |
| - delegate_->ForwardMessage( |
| - port->peer_node_name, |
| - NewInternalMessage(port->peer_port_name, |
| - EventType::kObserveClosure, data)); |
| - } else { |
| - InitiateProxyRemoval_Locked(port.get(), port_name); |
| - } |
| + return BeginProxying_Locked(port.get(), port_name); |
| } |
| - return OK; |
| } |
| int Node::OnObserveProxy(const PortName& port_name, |
| @@ -721,6 +724,61 @@ int Node::OnObserveClosure(const PortName& port_name, |
| return OK; |
| } |
| +int Node::OnMergePort(const PortName& port_name, |
| + const MergePortEventData& event) { |
| + scoped_refptr<Port> port = GetPort(port_name); |
| + if (!port) |
| + return ERROR_PORT_UNKNOWN; |
| + |
| + DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" |
| + << port->state << ") merging with proxy " << event.new_port_name |
| + << "@" << name_ << " pointing to " |
| + << event.new_port_descriptor.peer_port_name << "@" |
| + << event.new_port_descriptor.peer_node_name << " referred by " |
| + << event.new_port_descriptor.referring_port_name << "@" |
| + << event.new_port_descriptor.referring_node_name; |
| + |
| + // Accept the new port. This is now the receiving end of the other port cycle |
| + // to be merged with ours. |
| + int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); |
| + if (rv != OK) |
| + return rv; |
| + |
| + { |
| + // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn |
| + // needs to hold |ports_lock_|. We also acquire multiple port locks within. |
| + base::AutoLock ports_lock(ports_lock_); |
| + |
| + base::AutoLock lock(port->lock); |
| + |
| + if (port->state != Port::kReceiving) |
| + return ERROR_PORT_STATE_UNEXPECTED; |
| + |
| + scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); |
|
darin (slow to review)
2016/02/05 17:46:45
should we make some assertions about the state of
Ken Rockot(use gerrit already)
2016/02/05 19:43:05
done
|
| + base::AutoLock new_port_lock(new_port->lock); |
| + |
| + // Both ports are locked. Now all we have to do is swap their peer |
| + // information and set them up as proxies. |
| + |
| + std::swap(port->peer_node_name, new_port->peer_node_name); |
| + std::swap(port->peer_port_name, new_port->peer_port_name); |
| + |
| + port->state = Port::kBuffering; |
| + if (port->peer_closed) |
| + port->remove_proxy_on_last_message = true; |
| + |
| + new_port->state = Port::kBuffering; |
| + if (new_port->peer_closed) |
| + new_port->remove_proxy_on_last_message = true; |
| + |
| + rv = BeginProxying_Locked(port.get(), port_name); |
| + if (rv != OK) |
| + return rv; |
|
darin (slow to review)
2016/02/05 17:46:45
Are we in a bad state potentially if we early retu
Ken Rockot(use gerrit already)
2016/02/05 19:43:05
Yeah, I kinda glossed over this before. Changed it
|
| + |
| + return BeginProxying_Locked(new_port.get(), event.new_port_name); |
| + } |
| +} |
| + |
| int Node::AddPortWithName(const PortName& port_name, |
| const scoped_refptr<Port>& port) { |
| base::AutoLock lock(ports_lock_); |
| @@ -908,6 +966,40 @@ int Node::WillSendMessage_Locked(Port* port, |
| return OK; |
| } |
| +int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { |
| + ports_lock_.AssertAcquired(); |
| + port->lock.AssertAcquired(); |
| + |
| + if (port->state != Port::kBuffering) |
| + return OOPS(ERROR_PORT_STATE_UNEXPECTED); |
| + |
| + port->state = Port::kProxying; |
| + |
| + int rv = ForwardMessages_Locked(port, port_name); |
| + if (rv != OK) |
| + return rv; |
| + |
| + // We may have observed closure while buffering. In that case, we can advance |
| + // to removing the proxy without sending out an ObserveProxy message. We |
| + // already know the last expected message, etc. |
| + |
| + if (port->remove_proxy_on_last_message) { |
| + MaybeRemoveProxy_Locked(port, port_name); |
| + |
| + // Make sure we propagate closure to our current peer. |
| + ObserveClosureEventData data; |
| + data.last_sequence_num = port->last_sequence_num_to_receive; |
| + delegate_->ForwardMessage( |
| + port->peer_node_name, |
| + NewInternalMessage(port->peer_port_name, |
| + EventType::kObserveClosure, data)); |
| + } else { |
| + InitiateProxyRemoval_Locked(port, port_name); |
| + } |
| + |
| + return OK; |
| +} |
| + |
| int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { |
| ports_lock_.AssertAcquired(); |
| port->lock.AssertAcquired(); |