Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1383)

Unified Diff: mojo/edk/system/ports/node.cc

Issue 1785843002: [mojo] Implement pipe fusion API (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/public/c/system/message_pipe.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/ports/node.cc
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
index f26e8316a3717026b739a35bd4b74ab01819dc95..da2bdb09b65b723dd41bc85d900e9eb3abb68e61 100644
--- a/mojo/edk/system/ports/node.cc
+++ b/mojo/edk/system/ports/node.cc
@@ -388,6 +388,33 @@ int Node::MergePorts(const PortRef& port_ref,
return OK;
}
+int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
+ Port* port0 = port0_ref.port();
+ Port* port1 = port1_ref.port();
+ int rv;
+ {
+ // |ports_lock_| must be held when acquiring overlapping port locks.
+ base::AutoLock ports_lock(ports_lock_);
+ base::AutoLock port0_lock(port0->lock);
+ base::AutoLock port1_lock(port1->lock);
+
+ DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
+ << " and " << port1_ref.name() << "@" << name_;
+
+ if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
+ rv = ERROR_PORT_STATE_UNEXPECTED;
+ else
+ rv = MergePorts_Locked(port0_ref, port1_ref);
+ }
+
+ if (rv != OK) {
+ ClosePort(port0_ref);
+ ClosePort(port1_ref);
+ }
+
+ return rv;
+}
+
int Node::LostConnectionToNode(const NodeName& node_name) {
// We can no longer send events to the given node. We also can't expect any
// PortAccepted events.
@@ -763,32 +790,12 @@ int Node::OnMergePort(const PortName& port_name,
// Both ports are locked. Now all we have to do is swap their peer
// information and set them up as proxies.
- std::swap(port->peer_node_name, new_port->peer_node_name);
- std::swap(port->peer_port_name, new_port->peer_port_name);
- std::swap(port->peer_closed, new_port->peer_closed);
-
- port->state = Port::kBuffering;
- if (port->peer_closed)
- port->remove_proxy_on_last_message = true;
-
- new_port->state = Port::kBuffering;
- if (new_port->peer_closed)
- new_port->remove_proxy_on_last_message = true;
-
- int rv1 = BeginProxying_Locked(port.get(), port_name);
- int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
+ PortRef port0_ref(port_name, port);
+ PortRef port1_ref(event.new_port_name, new_port);
+ int rv = MergePorts_Locked(port0_ref, port1_ref);
+ if (rv == OK)
+ return rv;
- if (rv1 == OK && rv2 == OK)
- return OK;
-
- // If either proxy failed to initialize (e.g. had undeliverable messages
- // or ended up in a bad state somehow), we keep the system in a consistent
- // state by undoing the peer swap and closing both merge ports.
-
- std::swap(port->peer_node_name, new_port->peer_node_name);
- std::swap(port->peer_port_name, new_port->peer_port_name);
- port->state = Port::kReceiving;
- new_port->state = Port::kReceiving;
close_new_port = true;
close_target_port = true;
}
@@ -849,6 +856,87 @@ scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
return iter->second;
}
+int Node::MergePorts_Locked(const PortRef& port0_ref,
+ const PortRef& port1_ref) {
+ Port* port0 = port0_ref.port();
+ Port* port1 = port1_ref.port();
+
+ ports_lock_.AssertAcquired();
+ port0->lock.AssertAcquired();
+ port1->lock.AssertAcquired();
+
+ CHECK(port0->state == Port::kReceiving);
+ CHECK(port1->state == Port::kReceiving);
+
+ // Ports cannot be merged with their own receiving peer!
+ if (port0->peer_node_name == name_ &&
+ port0->peer_port_name == port1_ref.name())
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ if (port1->peer_node_name == name_ &&
+ port1->peer_port_name == port0_ref.name())
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ // Only merge if both ports have never sent a message.
+ if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
+ port1->next_sequence_num_to_send == kInitialSequenceNum) {
+ // Swap the ports' peer information and switch them both into buffering
+ // (eventually proxying) mode.
+
+ std::swap(port0->peer_node_name, port1->peer_node_name);
+ std::swap(port0->peer_port_name, port1->peer_port_name);
+ std::swap(port0->peer_closed, port1->peer_closed);
+
+ port0->state = Port::kBuffering;
+ if (port0->peer_closed)
+ port0->remove_proxy_on_last_message = true;
+
+ port1->state = Port::kBuffering;
+ if (port1->peer_closed)
+ port1->remove_proxy_on_last_message = true;
+
+ int rv1 = BeginProxying_Locked(port0, port0_ref.name());
+ int rv2 = BeginProxying_Locked(port1, port1_ref.name());
+
+ if (rv1 == OK && rv2 == OK) {
+ // If either merged port had a closed peer, its new peer needs to be
+ // informed of this.
+ if (port1->peer_closed) {
+ ObserveClosureEventData data;
+ data.last_sequence_num = port0->last_sequence_num_to_receive;
+ delegate_->ForwardMessage(
+ port0->peer_node_name,
+ NewInternalMessage(port0->peer_port_name,
+ EventType::kObserveClosure, data));
+ }
+
+ if (port0->peer_closed) {
+ ObserveClosureEventData data;
+ data.last_sequence_num = port1->last_sequence_num_to_receive;
+ delegate_->ForwardMessage(
+ port1->peer_node_name,
+ NewInternalMessage(port1->peer_port_name,
+ EventType::kObserveClosure, data));
+ }
+
+ return OK;
+ }
+
+ // If either proxy failed to initialize (e.g. had undeliverable messages
+ // or ended up in a bad state somehow), we keep the system in a consistent
+ // state by undoing the peer swap.
+ std::swap(port0->peer_node_name, port1->peer_node_name);
+ std::swap(port0->peer_port_name, port1->peer_port_name);
+ std::swap(port0->peer_closed, port1->peer_closed);
+ port0->remove_proxy_on_last_message = false;
+ port1->remove_proxy_on_last_message = false;
+ port0->state = Port::kReceiving;
+ port1->state = Port::kReceiving;
+ }
+
+ return ERROR_PORT_STATE_UNEXPECTED;
+}
+
void Node::WillSendPort_Locked(Port* port,
const NodeName& to_node_name,
PortName* port_name,
« no previous file with comments | « mojo/edk/system/ports/node.h ('k') | mojo/public/c/system/message_pipe.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698