| Index: mojo/edk/system/ports/node.cc
|
| diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
|
| index f26e8316a3717026b739a35bd4b74ab01819dc95..da2bdb09b65b723dd41bc85d900e9eb3abb68e61 100644
|
| --- a/mojo/edk/system/ports/node.cc
|
| +++ b/mojo/edk/system/ports/node.cc
|
| @@ -388,6 +388,33 @@ int Node::MergePorts(const PortRef& port_ref,
|
| return OK;
|
| }
|
|
|
| +int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
|
| + Port* port0 = port0_ref.port();
|
| + Port* port1 = port1_ref.port();
|
| + int rv;
|
| + {
|
| + // |ports_lock_| must be held when acquiring overlapping port locks.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| + base::AutoLock port0_lock(port0->lock);
|
| + base::AutoLock port1_lock(port1->lock);
|
| +
|
| + DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
|
| + << " and " << port1_ref.name() << "@" << name_;
|
| +
|
| + if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
|
| + rv = ERROR_PORT_STATE_UNEXPECTED;
|
| + else
|
| + rv = MergePorts_Locked(port0_ref, port1_ref);
|
| + }
|
| +
|
| + if (rv != OK) {
|
| + ClosePort(port0_ref);
|
| + ClosePort(port1_ref);
|
| + }
|
| +
|
| + return rv;
|
| +}
|
| +
|
| int Node::LostConnectionToNode(const NodeName& node_name) {
|
| // We can no longer send events to the given node. We also can't expect any
|
| // PortAccepted events.
|
| @@ -763,32 +790,12 @@ int Node::OnMergePort(const PortName& port_name,
|
| // Both ports are locked. Now all we have to do is swap their peer
|
| // information and set them up as proxies.
|
|
|
| - std::swap(port->peer_node_name, new_port->peer_node_name);
|
| - std::swap(port->peer_port_name, new_port->peer_port_name);
|
| - std::swap(port->peer_closed, new_port->peer_closed);
|
| -
|
| - port->state = Port::kBuffering;
|
| - if (port->peer_closed)
|
| - port->remove_proxy_on_last_message = true;
|
| -
|
| - new_port->state = Port::kBuffering;
|
| - if (new_port->peer_closed)
|
| - new_port->remove_proxy_on_last_message = true;
|
| -
|
| - int rv1 = BeginProxying_Locked(port.get(), port_name);
|
| - int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
|
| + PortRef port0_ref(port_name, port);
|
| + PortRef port1_ref(event.new_port_name, new_port);
|
| + int rv = MergePorts_Locked(port0_ref, port1_ref);
|
| + if (rv == OK)
|
| + return rv;
|
|
|
| - if (rv1 == OK && rv2 == OK)
|
| - return OK;
|
| -
|
| - // If either proxy failed to initialize (e.g. had undeliverable messages
|
| - // or ended up in a bad state somehow), we keep the system in a consistent
|
| - // state by undoing the peer swap and closing both merge ports.
|
| -
|
| - std::swap(port->peer_node_name, new_port->peer_node_name);
|
| - std::swap(port->peer_port_name, new_port->peer_port_name);
|
| - port->state = Port::kReceiving;
|
| - new_port->state = Port::kReceiving;
|
| close_new_port = true;
|
| close_target_port = true;
|
| }
|
| @@ -849,6 +856,87 @@ scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
|
| return iter->second;
|
| }
|
|
|
| +int Node::MergePorts_Locked(const PortRef& port0_ref,
|
| + const PortRef& port1_ref) {
|
| + Port* port0 = port0_ref.port();
|
| + Port* port1 = port1_ref.port();
|
| +
|
| + ports_lock_.AssertAcquired();
|
| + port0->lock.AssertAcquired();
|
| + port1->lock.AssertAcquired();
|
| +
|
| + CHECK(port0->state == Port::kReceiving);
|
| + CHECK(port1->state == Port::kReceiving);
|
| +
|
| + // Ports cannot be merged with their own receiving peer!
|
| + if (port0->peer_node_name == name_ &&
|
| + port0->peer_port_name == port1_ref.name())
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + if (port1->peer_node_name == name_ &&
|
| + port1->peer_port_name == port0_ref.name())
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + // Only merge if both ports have never sent a message.
|
| + if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
|
| + port1->next_sequence_num_to_send == kInitialSequenceNum) {
|
| + // Swap the ports' peer information and switch them both into buffering
|
| + // (eventually proxying) mode.
|
| +
|
| + std::swap(port0->peer_node_name, port1->peer_node_name);
|
| + std::swap(port0->peer_port_name, port1->peer_port_name);
|
| + std::swap(port0->peer_closed, port1->peer_closed);
|
| +
|
| + port0->state = Port::kBuffering;
|
| + if (port0->peer_closed)
|
| + port0->remove_proxy_on_last_message = true;
|
| +
|
| + port1->state = Port::kBuffering;
|
| + if (port1->peer_closed)
|
| + port1->remove_proxy_on_last_message = true;
|
| +
|
| + int rv1 = BeginProxying_Locked(port0, port0_ref.name());
|
| + int rv2 = BeginProxying_Locked(port1, port1_ref.name());
|
| +
|
| + if (rv1 == OK && rv2 == OK) {
|
| + // If either merged port had a closed peer, its new peer needs to be
|
| + // informed of this.
|
| + if (port1->peer_closed) {
|
| + ObserveClosureEventData data;
|
| + data.last_sequence_num = port0->last_sequence_num_to_receive;
|
| + delegate_->ForwardMessage(
|
| + port0->peer_node_name,
|
| + NewInternalMessage(port0->peer_port_name,
|
| + EventType::kObserveClosure, data));
|
| + }
|
| +
|
| + if (port0->peer_closed) {
|
| + ObserveClosureEventData data;
|
| + data.last_sequence_num = port1->last_sequence_num_to_receive;
|
| + delegate_->ForwardMessage(
|
| + port1->peer_node_name,
|
| + NewInternalMessage(port1->peer_port_name,
|
| + EventType::kObserveClosure, data));
|
| + }
|
| +
|
| + return OK;
|
| + }
|
| +
|
| + // If either proxy failed to initialize (e.g. had undeliverable messages
|
| + // or ended up in a bad state somehow), we keep the system in a consistent
|
| + // state by undoing the peer swap.
|
| + std::swap(port0->peer_node_name, port1->peer_node_name);
|
| + std::swap(port0->peer_port_name, port1->peer_port_name);
|
| + std::swap(port0->peer_closed, port1->peer_closed);
|
| + port0->remove_proxy_on_last_message = false;
|
| + port1->remove_proxy_on_last_message = false;
|
| + port0->state = Port::kReceiving;
|
| + port1->state = Port::kReceiving;
|
| + }
|
| +
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +}
|
| +
|
| void Node::WillSendPort_Locked(Port* port,
|
| const NodeName& to_node_name,
|
| PortName* port_name,
|
|
|