| Index: mojo/edk/system/ports/node.cc
|
| diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
|
| index 9cfc51131dfb536cd1aada3bb28816b44b565245..1718306fc672bab1f7e0660fd10fcaa732cbb0c1 100644
|
| --- a/mojo/edk/system/ports/node.cc
|
| +++ b/mojo/edk/system/ports/node.cc
|
| @@ -354,37 +354,8 @@
|
| return OnObserveClosure(
|
| header->port_name,
|
| GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
|
| - case EventType::kMergePort:
|
| - return OnMergePort(header->port_name,
|
| - *GetEventData<MergePortEventData>(*message));
|
| }
|
| return OOPS(ERROR_NOT_IMPLEMENTED);
|
| -}
|
| -
|
| -int Node::MergePorts(const PortRef& port_ref,
|
| - const NodeName& destination_node_name,
|
| - const PortName& destination_port_name) {
|
| - Port* port = port_ref.port();
|
| - {
|
| - // |ports_lock_| must be held for WillSendPort_Locked below.
|
| - base::AutoLock ports_lock(ports_lock_);
|
| - base::AutoLock lock(port->lock);
|
| -
|
| - DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
|
| - << " to " << destination_port_name << "@" << destination_node_name;
|
| -
|
| - // Send the port-to-merge over to the destination node so it can be merged
|
| - // into the port cycle atomically there.
|
| - MergePortEventData data;
|
| - data.new_port_name = port_ref.name();
|
| - WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
|
| - &data.new_port_descriptor);
|
| - delegate_->ForwardMessage(
|
| - destination_node_name,
|
| - NewInternalMessage(destination_port_name,
|
| - EventType::kMergePort, data));
|
| - }
|
| - return OK;
|
| }
|
|
|
| int Node::LostConnectionToNode(const NodeName& node_name) {
|
| @@ -538,8 +509,34 @@
|
| << " pointing to "
|
| << port->peer_port_name << "@" << port->peer_node_name;
|
|
|
| - return BeginProxying_Locked(port.get(), port_name);
|
| - }
|
| + if (port->state != Port::kBuffering)
|
| + return OOPS(ERROR_PORT_STATE_UNEXPECTED);
|
| +
|
| + port->state = Port::kProxying;
|
| +
|
| + int rv = ForwardMessages_Locked(port.get(), port_name);
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + // We may have observed closure before receiving PortAccepted. In that
|
| + // case, we can advance to removing the proxy without sending out an
|
| + // ObserveProxy message. We already know the last expected message, etc.
|
| +
|
| + if (port->remove_proxy_on_last_message) {
|
| + MaybeRemoveProxy_Locked(port.get(), port_name);
|
| +
|
| + // Make sure we propagate closure to our current peer.
|
| + ObserveClosureEventData data;
|
| + data.last_sequence_num = port->last_sequence_num_to_receive;
|
| + delegate_->ForwardMessage(
|
| + port->peer_node_name,
|
| + NewInternalMessage(port->peer_port_name,
|
| + EventType::kObserveClosure, data));
|
| + } else {
|
| + InitiateProxyRemoval_Locked(port.get(), port_name);
|
| + }
|
| + }
|
| + return OK;
|
| }
|
|
|
| int Node::OnObserveProxy(const PortName& port_name,
|
| @@ -724,94 +721,6 @@
|
| return OK;
|
| }
|
|
|
| -int Node::OnMergePort(const PortName& port_name,
|
| - const MergePortEventData& event) {
|
| - scoped_refptr<Port> port = GetPort(port_name);
|
| - if (!port)
|
| - return ERROR_PORT_UNKNOWN;
|
| -
|
| - DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
|
| - << port->state << ") merging with proxy " << event.new_port_name
|
| - << "@" << name_ << " pointing to "
|
| - << event.new_port_descriptor.peer_port_name << "@"
|
| - << event.new_port_descriptor.peer_node_name << " referred by "
|
| - << event.new_port_descriptor.referring_port_name << "@"
|
| - << event.new_port_descriptor.referring_node_name;
|
| -
|
| - bool close_target_port = false;
|
| - bool close_new_port = false;
|
| -
|
| - // Accept the new port. This is now the receiving end of the other port cycle
|
| - // to be merged with ours.
|
| - int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
|
| - if (rv != OK) {
|
| - close_target_port = true;
|
| - } else {
|
| - // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
|
| - // needs to hold |ports_lock_|. We also acquire multiple port locks within.
|
| - base::AutoLock ports_lock(ports_lock_);
|
| - base::AutoLock lock(port->lock);
|
| -
|
| - if (port->state != Port::kReceiving) {
|
| - close_new_port = true;
|
| - } else {
|
| - scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
|
| - base::AutoLock new_port_lock(new_port->lock);
|
| - DCHECK(new_port->state == Port::kReceiving);
|
| -
|
| - // Both ports are locked. Now all we have to do is swap their peer
|
| - // information and set them up as proxies.
|
| -
|
| - std::swap(port->peer_node_name, new_port->peer_node_name);
|
| - std::swap(port->peer_port_name, new_port->peer_port_name);
|
| - std::swap(port->peer_closed, new_port->peer_closed);
|
| -
|
| - port->state = Port::kBuffering;
|
| - if (port->peer_closed)
|
| - port->remove_proxy_on_last_message = true;
|
| -
|
| - new_port->state = Port::kBuffering;
|
| - if (new_port->peer_closed)
|
| - new_port->remove_proxy_on_last_message = true;
|
| -
|
| - int rv1 = BeginProxying_Locked(port.get(), port_name);
|
| - int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
|
| -
|
| - if (rv1 == OK && rv2 == OK)
|
| - return OK;
|
| -
|
| - // If either proxy failed to initialize (e.g. had undeliverable messages
|
| - // or ended up in a bad state somehow), we keep the system in a consistent
|
| - // state by undoing the peer swap and closing both merge ports.
|
| -
|
| - std::swap(port->peer_node_name, new_port->peer_node_name);
|
| - std::swap(port->peer_port_name, new_port->peer_port_name);
|
| - port->state = Port::kReceiving;
|
| - new_port->state = Port::kReceiving;
|
| - close_new_port = true;
|
| - close_target_port = true;
|
| - }
|
| - }
|
| -
|
| - if (close_target_port) {
|
| - PortRef target_port;
|
| - rv = GetPort(port_name, &target_port);
|
| - DCHECK(rv == OK);
|
| -
|
| - ClosePort(target_port);
|
| - }
|
| -
|
| - if (close_new_port) {
|
| - PortRef new_port;
|
| - rv = GetPort(event.new_port_name, &new_port);
|
| - DCHECK(rv == OK);
|
| -
|
| - ClosePort(new_port);
|
| - }
|
| -
|
| - return ERROR_PORT_STATE_UNEXPECTED;
|
| -}
|
| -
|
| int Node::AddPortWithName(const PortName& port_name,
|
| const scoped_refptr<Port>& port) {
|
| base::AutoLock lock(ports_lock_);
|
| @@ -999,40 +908,6 @@
|
| return OK;
|
| }
|
|
|
| -int Node::BeginProxying_Locked(Port* port, const PortName& port_name) {
|
| - ports_lock_.AssertAcquired();
|
| - port->lock.AssertAcquired();
|
| -
|
| - if (port->state != Port::kBuffering)
|
| - return OOPS(ERROR_PORT_STATE_UNEXPECTED);
|
| -
|
| - port->state = Port::kProxying;
|
| -
|
| - int rv = ForwardMessages_Locked(port, port_name);
|
| - if (rv != OK)
|
| - return rv;
|
| -
|
| - // We may have observed closure while buffering. In that case, we can advance
|
| - // to removing the proxy without sending out an ObserveProxy message. We
|
| - // already know the last expected message, etc.
|
| -
|
| - if (port->remove_proxy_on_last_message) {
|
| - MaybeRemoveProxy_Locked(port, port_name);
|
| -
|
| - // Make sure we propagate closure to our current peer.
|
| - ObserveClosureEventData data;
|
| - data.last_sequence_num = port->last_sequence_num_to_receive;
|
| - delegate_->ForwardMessage(
|
| - port->peer_node_name,
|
| - NewInternalMessage(port->peer_port_name,
|
| - EventType::kObserveClosure, data));
|
| - } else {
|
| - InitiateProxyRemoval_Locked(port, port_name);
|
| - }
|
| -
|
| - return OK;
|
| -}
|
| -
|
| int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
|
| ports_lock_.AssertAcquired();
|
| port->lock.AssertAcquired();
|
|
|