| Index: mojo/edk/system/ports/node.cc
|
| diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
|
| index 1718306fc672bab1f7e0660fd10fcaa732cbb0c1..9cfc51131dfb536cd1aada3bb28816b44b565245 100644
|
| --- a/mojo/edk/system/ports/node.cc
|
| +++ b/mojo/edk/system/ports/node.cc
|
| @@ -354,10 +354,39 @@ int Node::AcceptMessage(ScopedMessage message) {
|
| return OnObserveClosure(
|
| header->port_name,
|
| GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
|
| + case EventType::kMergePort:
|
| + return OnMergePort(header->port_name,
|
| + *GetEventData<MergePortEventData>(*message));
|
| }
|
| return OOPS(ERROR_NOT_IMPLEMENTED);
|
| }
|
|
|
| +int Node::MergePorts(const PortRef& port_ref,
|
| + const NodeName& destination_node_name,
|
| + const PortName& destination_port_name) {
|
| + Port* port = port_ref.port();
|
| + {
|
| + // |ports_lock_| must be held for WillSendPort_Locked below.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
|
| + << " to " << destination_port_name << "@" << destination_node_name;
|
| +
|
| + // Send the port-to-merge over to the destination node so it can be merged
|
| + // into the port cycle atomically there.
|
| + MergePortEventData data;
|
| + data.new_port_name = port_ref.name();
|
| + WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
|
| + &data.new_port_descriptor);
|
| + delegate_->ForwardMessage(
|
| + destination_node_name,
|
| + NewInternalMessage(destination_port_name,
|
| + EventType::kMergePort, data));
|
| + }
|
| + return OK;
|
| +}
|
| +
|
| int Node::LostConnectionToNode(const NodeName& node_name) {
|
| // We can no longer send events to the given node. We also can't expect any
|
| // PortAccepted events.
|
| @@ -509,34 +538,8 @@ int Node::OnPortAccepted(const PortName& port_name) {
|
| << " pointing to "
|
| << port->peer_port_name << "@" << port->peer_node_name;
|
|
|
| - if (port->state != Port::kBuffering)
|
| - return OOPS(ERROR_PORT_STATE_UNEXPECTED);
|
| -
|
| - port->state = Port::kProxying;
|
| -
|
| - int rv = ForwardMessages_Locked(port.get(), port_name);
|
| - if (rv != OK)
|
| - return rv;
|
| -
|
| - // We may have observed closure before receiving PortAccepted. In that
|
| - // case, we can advance to removing the proxy without sending out an
|
| - // ObserveProxy message. We already know the last expected message, etc.
|
| -
|
| - if (port->remove_proxy_on_last_message) {
|
| - MaybeRemoveProxy_Locked(port.get(), port_name);
|
| -
|
| - // Make sure we propagate closure to our current peer.
|
| - ObserveClosureEventData data;
|
| - data.last_sequence_num = port->last_sequence_num_to_receive;
|
| - delegate_->ForwardMessage(
|
| - port->peer_node_name,
|
| - NewInternalMessage(port->peer_port_name,
|
| - EventType::kObserveClosure, data));
|
| - } else {
|
| - InitiateProxyRemoval_Locked(port.get(), port_name);
|
| - }
|
| + return BeginProxying_Locked(port.get(), port_name);
|
| }
|
| - return OK;
|
| }
|
|
|
| int Node::OnObserveProxy(const PortName& port_name,
|
| @@ -721,6 +724,94 @@ int Node::OnObserveClosure(const PortName& port_name,
|
| return OK;
|
| }
|
|
|
| +int Node::OnMergePort(const PortName& port_name,
|
| + const MergePortEventData& event) {
|
| + scoped_refptr<Port> port = GetPort(port_name);
|
| + if (!port)
|
| + return ERROR_PORT_UNKNOWN;
|
| +
|
| + DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
|
| + << port->state << ") merging with proxy " << event.new_port_name
|
| + << "@" << name_ << " pointing to "
|
| + << event.new_port_descriptor.peer_port_name << "@"
|
| + << event.new_port_descriptor.peer_node_name << " referred by "
|
| + << event.new_port_descriptor.referring_port_name << "@"
|
| + << event.new_port_descriptor.referring_node_name;
|
| +
|
| + bool close_target_port = false;
|
| + bool close_new_port = false;
|
| +
|
| + // Accept the new port. This is now the receiving end of the other port cycle
|
| + // to be merged with ours.
|
| + int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
|
| + if (rv != OK) {
|
| + close_target_port = true;
|
| + } else {
|
| + // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
|
| + // needs to hold |ports_lock_|. We also acquire multiple port locks within.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + if (port->state != Port::kReceiving) {
|
| + close_new_port = true;
|
| + } else {
|
| + scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
|
| + base::AutoLock new_port_lock(new_port->lock);
|
| + DCHECK(new_port->state == Port::kReceiving);
|
| +
|
| + // Both ports are locked. Now all we have to do is swap their peer
|
| + // information and set them up as proxies.
|
| +
|
| + std::swap(port->peer_node_name, new_port->peer_node_name);
|
| + std::swap(port->peer_port_name, new_port->peer_port_name);
|
| + std::swap(port->peer_closed, new_port->peer_closed);
|
| +
|
| + port->state = Port::kBuffering;
|
| + if (port->peer_closed)
|
| + port->remove_proxy_on_last_message = true;
|
| +
|
| + new_port->state = Port::kBuffering;
|
| + if (new_port->peer_closed)
|
| + new_port->remove_proxy_on_last_message = true;
|
| +
|
| + int rv1 = BeginProxying_Locked(port.get(), port_name);
|
| + int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
|
| +
|
| + if (rv1 == OK && rv2 == OK)
|
| + return OK;
|
| +
|
| + // If either proxy failed to initialize (e.g. had undeliverable messages
|
| + // or ended up in a bad state somehow), we keep the system in a consistent
|
| + // state by undoing the peer swap and closing both merge ports.
|
| +
|
| + std::swap(port->peer_node_name, new_port->peer_node_name);
|
| + std::swap(port->peer_port_name, new_port->peer_port_name);
|
| + port->state = Port::kReceiving;
|
| + new_port->state = Port::kReceiving;
|
| + close_new_port = true;
|
| + close_target_port = true;
|
| + }
|
| + }
|
| +
|
| + if (close_target_port) {
|
| + PortRef target_port;
|
| + rv = GetPort(port_name, &target_port);
|
| + DCHECK(rv == OK);
|
| +
|
| + ClosePort(target_port);
|
| + }
|
| +
|
| + if (close_new_port) {
|
| + PortRef new_port;
|
| + rv = GetPort(event.new_port_name, &new_port);
|
| + DCHECK(rv == OK);
|
| +
|
| + ClosePort(new_port);
|
| + }
|
| +
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +}
|
| +
|
| int Node::AddPortWithName(const PortName& port_name,
|
| const scoped_refptr<Port>& port) {
|
| base::AutoLock lock(ports_lock_);
|
| @@ -908,6 +999,40 @@ int Node::WillSendMessage_Locked(Port* port,
|
| return OK;
|
| }
|
|
|
| +int Node::BeginProxying_Locked(Port* port, const PortName& port_name) {
|
| + ports_lock_.AssertAcquired();
|
| + port->lock.AssertAcquired();
|
| +
|
| + if (port->state != Port::kBuffering)
|
| + return OOPS(ERROR_PORT_STATE_UNEXPECTED);
|
| +
|
| + port->state = Port::kProxying;
|
| +
|
| + int rv = ForwardMessages_Locked(port, port_name);
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + // We may have observed closure while buffering. In that case, we can advance
|
| + // to removing the proxy without sending out an ObserveProxy message. We
|
| + // already know the last expected message, etc.
|
| +
|
| + if (port->remove_proxy_on_last_message) {
|
| + MaybeRemoveProxy_Locked(port, port_name);
|
| +
|
| + // Make sure we propagate closure to our current peer.
|
| + ObserveClosureEventData data;
|
| + data.last_sequence_num = port->last_sequence_num_to_receive;
|
| + delegate_->ForwardMessage(
|
| + port->peer_node_name,
|
| + NewInternalMessage(port->peer_port_name,
|
| + EventType::kObserveClosure, data));
|
| + } else {
|
| + InitiateProxyRemoval_Locked(port, port_name);
|
| + }
|
| +
|
| + return OK;
|
| +}
|
| +
|
| int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
|
| ports_lock_.AssertAcquired();
|
| port->lock.AssertAcquired();
|
|
|