Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(84)

Unified Diff: mojo/edk/system/ports/node.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/ports/node.cc
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
new file mode 100644
index 0000000000000000000000000000000000000000..1718306fc672bab1f7e0660fd10fcaa732cbb0c1
--- /dev/null
+++ b/mojo/edk/system/ports/node.cc
@@ -0,0 +1,998 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/ports/node.h"
+
+#include <string.h>
+
+#include "base/logging.h"
+#include "base/memory/ref_counted.h"
+#include "base/synchronization/lock.h"
+#include "mojo/edk/system/ports/node_delegate.h"
+
+namespace mojo {
+namespace edk {
+namespace ports {
+
+namespace {
+
+int DebugError(const char* message, int error_code) {
+ CHECK(false) << "Oops: " << message;
+ return error_code;
+}
+
+#define OOPS(x) DebugError(#x, x)
+
+bool CanAcceptMoreMessages(const Port* port) {
+ // Have we already doled out the last message (i.e., do we expect to NOT
+ // receive further messages)?
+ uint64_t next_sequence_num = port->message_queue.next_sequence_num();
+ if (port->peer_closed || port->remove_proxy_on_last_message) {
+ if (port->last_sequence_num_to_receive == next_sequence_num - 1)
+ return false;
+ }
+ return true;
+}
+
+} // namespace
+
+Node::Node(const NodeName& name, NodeDelegate* delegate)
+ : name_(name),
+ delegate_(delegate) {
+}
+
+Node::~Node() {
+ if (!ports_.empty())
+ DLOG(WARNING) << "Unclean shutdown for node " << name_;
+}
+
+bool Node::CanShutdownCleanly(bool allow_local_ports) {
+ base::AutoLock ports_lock(ports_lock_);
+
+ if (!allow_local_ports) {
+#if !defined(NDEBUG)
+ for (auto entry : ports_) {
+ DVLOG(2) << "Port " << entry.first << " referencing node "
+ << entry.second->peer_node_name << " is blocking shutdown of "
+ << "node " << name_ << " (state=" << entry.second->state << ")";
+ }
+#endif
+ return ports_.empty();
+ }
+
+ // NOTE: This is not efficient, though it probably doesn't need to be since
+ // relatively few ports should be open during shutdown and shutdown doesn't
+ // need to be blazingly fast.
+ bool can_shutdown = true;
+ for (auto entry : ports_) {
+ base::AutoLock lock(entry.second->lock);
+ if (entry.second->peer_node_name != name_ &&
+ entry.second->state != Port::kReceiving) {
+ can_shutdown = false;
+#if !defined(NDEBUG)
+ DVLOG(2) << "Port " << entry.first << " referencing node "
+ << entry.second->peer_node_name << " is blocking shutdown of "
+ << "node " << name_ << " (state=" << entry.second->state << ")";
+#else
+ // Exit early when not debugging.
+ break;
+#endif
+ }
+ }
+
+ return can_shutdown;
+}
+
+int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
+ scoped_refptr<Port> port = GetPort(port_name);
+ if (!port)
+ return ERROR_PORT_UNKNOWN;
+
+ *port_ref = PortRef(port_name, std::move(port));
+ return OK;
+}
+
+int Node::CreateUninitializedPort(PortRef* port_ref) {
+ PortName port_name;
+ delegate_->GenerateRandomPortName(&port_name);
+
+ scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum,
+ kInitialSequenceNum));
+ int rv = AddPortWithName(port_name, port);
+ if (rv != OK)
+ return rv;
+
+ *port_ref = PortRef(port_name, std::move(port));
+ return OK;
+}
+
+int Node::InitializePort(const PortRef& port_ref,
+ const NodeName& peer_node_name,
+ const PortName& peer_port_name) {
+ Port* port = port_ref.port();
+
+ {
+ base::AutoLock lock(port->lock);
+ if (port->state != Port::kUninitialized)
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ port->state = Port::kReceiving;
+ port->peer_node_name = peer_node_name;
+ port->peer_port_name = peer_port_name;
+ }
+
+ delegate_->PortStatusChanged(port_ref);
+
+ return OK;
+}
+
+int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
+ int rv;
+
+ rv = CreateUninitializedPort(port0_ref);
+ if (rv != OK)
+ return rv;
+
+ rv = CreateUninitializedPort(port1_ref);
+ if (rv != OK)
+ return rv;
+
+ rv = InitializePort(*port0_ref, name_, port1_ref->name());
+ if (rv != OK)
+ return rv;
+
+ rv = InitializePort(*port1_ref, name_, port0_ref->name());
+ if (rv != OK)
+ return rv;
+
+ return OK;
+}
+
+int Node::SetUserData(const PortRef& port_ref,
+ const scoped_refptr<UserData>& user_data) {
+ Port* port = port_ref.port();
+
+ base::AutoLock lock(port->lock);
+ if (port->state == Port::kClosed)
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ port->user_data = std::move(user_data);
+
+ return OK;
+}
+
+int Node::GetUserData(const PortRef& port_ref,
+ scoped_refptr<UserData>* user_data) {
+ Port* port = port_ref.port();
+
+ base::AutoLock lock(port->lock);
+ if (port->state == Port::kClosed)
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ *user_data = port->user_data;
+
+ return OK;
+}
+
+int Node::ClosePort(const PortRef& port_ref) {
+ std::deque<PortName> referenced_port_names;
+
+ ObserveClosureEventData data;
+
+ NodeName peer_node_name;
+ PortName peer_port_name;
+ Port* port = port_ref.port();
+ {
+ // We may need to erase the port, which requires ports_lock_ to be held,
+ // but ports_lock_ must be acquired before any individual port locks.
+ base::AutoLock ports_lock(ports_lock_);
+
+ base::AutoLock lock(port->lock);
+ if (port->state == Port::kUninitialized) {
+ // If the port was not yet initialized, there's nothing interesting to do.
+ ErasePort_Locked(port_ref.name());
+ return OK;
+ }
+
+ if (port->state != Port::kReceiving)
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ port->state = Port::kClosed;
+
+ // We pass along the sequence number of the last message sent from this
+ // port to allow the peer to have the opportunity to consume all inbound
+ // messages before notifying the embedder that this port is closed.
+ data.last_sequence_num = port->next_sequence_num_to_send - 1;
+
+ peer_node_name = port->peer_node_name;
+ peer_port_name = port->peer_port_name;
+
+ // If the port being closed still has unread messages, then we need to take
+ // care to close those ports so as to avoid leaking memory.
+ port->message_queue.GetReferencedPorts(&referenced_port_names);
+ }
+
+ DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
+ << " to " << peer_port_name << "@" << peer_node_name;
+
+ ErasePort(port_ref.name());
+
+ delegate_->ForwardMessage(
+ peer_node_name,
+ NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
+
+ for (const auto& name : referenced_port_names) {
+ PortRef ref;
+ if (GetPort(name, &ref) == OK)
+ ClosePort(ref);
+ }
+ return OK;
+}
+
+int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
+ Port* port = port_ref.port();
+
+ base::AutoLock lock(port->lock);
+
+ if (port->state != Port::kReceiving)
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ port_status->has_messages = port->message_queue.HasNextMessage();
+ port_status->receiving_messages = CanAcceptMoreMessages(port);
+ port_status->peer_closed = port->peer_closed;
+ return OK;
+}
+
+int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) {
+ return GetMessageIf(port_ref, nullptr, message);
+}
+
+int Node::GetMessageIf(const PortRef& port_ref,
+ std::function<bool(const Message&)> selector,
+ ScopedMessage* message) {
+ *message = nullptr;
+
+ DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_;
+
+ Port* port = port_ref.port();
+ {
+ base::AutoLock lock(port->lock);
+
+ // This could also be treated like the port being unknown since the
+ // embedder should no longer be referring to a port that has been sent.
+ if (port->state != Port::kReceiving)
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ // Let the embedder get messages until there are no more before reporting
+ // that the peer closed its end.
+ if (!CanAcceptMoreMessages(port))
+ return ERROR_PORT_PEER_CLOSED;
+
+ port->message_queue.GetNextMessageIf(selector, message);
+ }
+
+ // Allow referenced ports to trigger PortStatusChanged calls.
+ if (*message) {
+ for (size_t i = 0; i < (*message)->num_ports(); ++i) {
+ const PortName& new_port_name = (*message)->ports()[i];
+ scoped_refptr<Port> new_port = GetPort(new_port_name);
+
+ DCHECK(new_port) << "Port " << new_port_name << "@" << name_
+ << " does not exist!";
+
+ base::AutoLock lock(new_port->lock);
+
+ DCHECK(new_port->state == Port::kReceiving);
+ new_port->message_queue.set_signalable(true);
+ }
+ }
+
+ return OK;
+}
+
+int Node::SendMessage(const PortRef& port_ref, ScopedMessage* message) {
+ ScopedMessage& m = *message;
+ for (size_t i = 0; i < m->num_ports(); ++i) {
+ if (m->ports()[i] == port_ref.name())
+ return ERROR_PORT_CANNOT_SEND_SELF;
+ }
+
+ Port* port = port_ref.port();
+ {
+ // We must acquire |ports_lock_| before grabbing any port locks, because
+ // WillSendMessage_Locked may need to lock multiple ports out of order.
+ base::AutoLock ports_lock(ports_lock_);
+ base::AutoLock lock(port->lock);
+
+ if (port->state != Port::kReceiving)
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ if (port->peer_closed)
+ return ERROR_PORT_PEER_CLOSED;
+
+ int rv = WillSendMessage_Locked(port, port_ref.name(), m.get());
+ if (rv != OK)
+ return rv;
+
+ // Beyond this point there's no sense in returning anything but OK. Even if
+ // message forwarding or acceptance fails, there's nothing the embedder can
+ // do to recover. Assume that failure beyond this point must be treated as a
+ // transport failure.
+
+ if (port->peer_node_name != name_) {
+ delegate_->ForwardMessage(port->peer_node_name, std::move(m));
+ return OK;
+ }
+ }
+
+ int rv = AcceptMessage(std::move(m));
+ if (rv != OK) {
+ // See comment above for why we don't return an error in this case.
+ DVLOG(2) << "AcceptMessage failed: " << rv;
+ }
+
+ return OK;
+}
+
+int Node::AcceptMessage(ScopedMessage message) {
+ const EventHeader* header = GetEventHeader(*message);
+ switch (header->type) {
+ case EventType::kUser:
+ return OnUserMessage(std::move(message));
+ case EventType::kPortAccepted:
+ return OnPortAccepted(header->port_name);
+ case EventType::kObserveProxy:
+ return OnObserveProxy(
+ header->port_name,
+ *GetEventData<ObserveProxyEventData>(*message));
+ case EventType::kObserveProxyAck:
+ return OnObserveProxyAck(
+ header->port_name,
+ GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
+ case EventType::kObserveClosure:
+ return OnObserveClosure(
+ header->port_name,
+ GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
+ }
+ return OOPS(ERROR_NOT_IMPLEMENTED);
+}
+
+int Node::LostConnectionToNode(const NodeName& node_name) {
+ // We can no longer send events to the given node. We also can't expect any
+ // PortAccepted events.
+
+ DVLOG(1) << "Observing lost connection from node " << name_
+ << " to node " << node_name;
+
+ std::vector<PortRef> ports_to_notify;
+
+ {
+ base::AutoLock ports_lock(ports_lock_);
+
+ for (auto iter = ports_.begin(); iter != ports_.end(); ) {
+ scoped_refptr<Port>& port = iter->second;
+
+ bool remove_port = false;
+ {
+ base::AutoLock port_lock(port->lock);
+
+ if (port->peer_node_name == node_name) {
+ // We can no longer send messages to this port's peer. We assume we
+ // will not receive any more messages from this port's peer as well.
+ if (!port->peer_closed) {
+ port->peer_closed = true;
+ port->last_sequence_num_to_receive =
+ port->message_queue.next_sequence_num() - 1;
+
+ if (port->state == Port::kReceiving)
+ ports_to_notify.push_back(PortRef(iter->first, port));
+ }
+
+ // We do not expect to forward any further messages, and we do not
+ // expect to receive a Port{Accepted,Rejected} event.
+ if (port->state != Port::kReceiving)
+ remove_port = true;
+ }
+ }
+
+ if (remove_port) {
+ DVLOG(2) << "Deleted port " << iter->first << "@" << name_;
+ iter = ports_.erase(iter);
+ } else {
+ ++iter;
+ }
+ }
+ }
+
+ for (size_t i = 0; i < ports_to_notify.size(); ++i)
+ delegate_->PortStatusChanged(ports_to_notify[i]);
+
+ return OK;
+}
+
+int Node::OnUserMessage(ScopedMessage message) {
+ PortName port_name = GetEventHeader(*message)->port_name;
+ const auto* event = GetEventData<UserEventData>(*message);
+
+#if !defined(NDEBUG)
+ std::ostringstream ports_buf;
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ if (i > 0)
+ ports_buf << ",";
+ ports_buf << message->ports()[i];
+ }
+
+ DVLOG(2) << "AcceptMessage " << event->sequence_num
+ << " [ports=" << ports_buf.str() << "] at "
+ << port_name << "@" << name_;
+#endif
+
+ scoped_refptr<Port> port = GetPort(port_name);
+
+ // Even if this port does not exist, cannot receive anymore messages or is
+ // buffering or proxying messages, we still need these ports to be bound to
+ // this node. When the message is forwarded, these ports will get transferred
+ // following the usual method. If the message cannot be accepted, then the
+ // newly bound ports will simply be closed.
+
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
+ if (rv != OK)
+ return rv;
+ }
+
+ bool has_next_message = false;
+ bool message_accepted = false;
+
+ if (port) {
+ // We may want to forward messages once the port lock is held, so we must
+ // acquire |ports_lock_| first.
+ base::AutoLock ports_lock(ports_lock_);
+ base::AutoLock lock(port->lock);
+
+ // Reject spurious messages if we've already received the last expected
+ // message.
+ if (CanAcceptMoreMessages(port.get())) {
+ message_accepted = true;
+ port->message_queue.AcceptMessage(std::move(message), &has_next_message);
+
+ if (port->state == Port::kBuffering) {
+ has_next_message = false;
+ } else if (port->state == Port::kProxying) {
+ has_next_message = false;
+
+ // Forward messages. We forward messages in sequential order here so
+ // that we maintain the message queue's notion of next sequence number.
+ // That's useful for the proxy removal process as we can tell when this
+ // port has seen all of the messages it is expected to see.
+ int rv = ForwardMessages_Locked(port.get(), port_name);
+ if (rv != OK)
+ return rv;
+
+ MaybeRemoveProxy_Locked(port.get(), port_name);
+ }
+ }
+ }
+
+ if (!message_accepted) {
+ DVLOG(2) << "Message not accepted!\n";
+ // Close all newly accepted ports as they are effectively orphaned.
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ PortRef port_ref;
+ if (GetPort(message->ports()[i], &port_ref) == OK) {
+ ClosePort(port_ref);
+ } else {
+ DLOG(WARNING) << "Cannot close non-existent port!\n";
+ }
+ }
+ } else if (has_next_message) {
+ PortRef port_ref(port_name, port);
+ delegate_->PortStatusChanged(port_ref);
+ }
+
+ return OK;
+}
+
+int Node::OnPortAccepted(const PortName& port_name) {
+ scoped_refptr<Port> port = GetPort(port_name);
+ if (!port)
+ return OOPS(ERROR_PORT_UNKNOWN);
+
+ {
+ // We must hold |ports_lock_| before grabbing the port lock because
+ // ForwardMessages_Locked requires it to be held.
+ base::AutoLock ports_lock(ports_lock_);
+ base::AutoLock lock(port->lock);
+
+ DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
+ << " pointing to "
+ << port->peer_port_name << "@" << port->peer_node_name;
+
+ if (port->state != Port::kBuffering)
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED);
+
+ port->state = Port::kProxying;
+
+ int rv = ForwardMessages_Locked(port.get(), port_name);
+ if (rv != OK)
+ return rv;
+
+ // We may have observed closure before receiving PortAccepted. In that
+ // case, we can advance to removing the proxy without sending out an
+ // ObserveProxy message. We already know the last expected message, etc.
+
+ if (port->remove_proxy_on_last_message) {
+ MaybeRemoveProxy_Locked(port.get(), port_name);
+
+ // Make sure we propagate closure to our current peer.
+ ObserveClosureEventData data;
+ data.last_sequence_num = port->last_sequence_num_to_receive;
+ delegate_->ForwardMessage(
+ port->peer_node_name,
+ NewInternalMessage(port->peer_port_name,
+ EventType::kObserveClosure, data));
+ } else {
+ InitiateProxyRemoval_Locked(port.get(), port_name);
+ }
+ }
+ return OK;
+}
+
+int Node::OnObserveProxy(const PortName& port_name,
+ const ObserveProxyEventData& event) {
+ // The port may have already been closed locally, in which case the
+ // ObserveClosure message will contain the last_sequence_num field.
+ // We can then silently ignore this message.
+ scoped_refptr<Port> port = GetPort(port_name);
+ if (!port) {
+ DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
+ return OK;
+ }
+
+ DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
+ << event.proxy_port_name << "@"
+ << event.proxy_node_name << " pointing to "
+ << event.proxy_to_port_name << "@"
+ << event.proxy_to_node_name;
+
+ {
+ base::AutoLock lock(port->lock);
+
+ if (port->peer_node_name == event.proxy_node_name &&
+ port->peer_port_name == event.proxy_port_name) {
+ if (port->state == Port::kReceiving) {
+ port->peer_node_name = event.proxy_to_node_name;
+ port->peer_port_name = event.proxy_to_port_name;
+
+ ObserveProxyAckEventData ack;
+ ack.last_sequence_num = port->next_sequence_num_to_send - 1;
+
+ delegate_->ForwardMessage(
+ event.proxy_node_name,
+ NewInternalMessage(event.proxy_port_name,
+ EventType::kObserveProxyAck,
+ ack));
+ } else {
+ // As a proxy ourselves, we don't know how to honor the ObserveProxy
+ // event or to populate the last_sequence_num field of ObserveProxyAck.
+ // Afterall, another port could be sending messages to our peer now
+ // that we've sent out our own ObserveProxy event. Instead, we will
+ // send an ObserveProxyAck indicating that the ObserveProxy event
+ // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
+ // However, this has to be done after we are removed as a proxy.
+ // Otherwise, we might just find ourselves back here again, which
+ // would be akin to a busy loop.
+
+ DVLOG(2) << "Delaying ObserveProxyAck to "
+ << event.proxy_port_name << "@" << event.proxy_node_name;
+
+ ObserveProxyAckEventData ack;
+ ack.last_sequence_num = kInvalidSequenceNum;
+
+ port->send_on_proxy_removal.reset(
+ new std::pair<NodeName, ScopedMessage>(
+ event.proxy_node_name,
+ NewInternalMessage(event.proxy_port_name,
+ EventType::kObserveProxyAck,
+ ack)));
+ }
+ } else {
+ // Forward this event along to our peer. Eventually, it should find the
+ // port referring to the proxy.
+ delegate_->ForwardMessage(
+ port->peer_node_name,
+ NewInternalMessage(port->peer_port_name,
+ EventType::kObserveProxy,
+ event));
+ }
+ }
+ return OK;
+}
+
+int Node::OnObserveProxyAck(const PortName& port_name,
+ uint64_t last_sequence_num) {
+ DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
+ << " (last_sequence_num=" << last_sequence_num << ")";
+
+ scoped_refptr<Port> port = GetPort(port_name);
+ if (!port)
+ return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so
+ // this is not an "Oops".
+
+ {
+ // We must acquire |ports_lock_| before the port lock because it must be
+ // held for MaybeRemoveProxy_Locked.
+ base::AutoLock ports_lock(ports_lock_);
+
+ base::AutoLock lock(port->lock);
+
+ if (port->state != Port::kProxying)
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED);
+
+ if (last_sequence_num == kInvalidSequenceNum) {
+ // Send again.
+ InitiateProxyRemoval_Locked(port.get(), port_name);
+ return OK;
+ }
+
+ // We can now remove this port once we have received and forwarded the last
+ // message addressed to this port.
+ port->remove_proxy_on_last_message = true;
+ port->last_sequence_num_to_receive = last_sequence_num;
+
+ MaybeRemoveProxy_Locked(port.get(), port_name);
+ }
+ return OK;
+}
+
+int Node::OnObserveClosure(const PortName& port_name,
+ uint64_t last_sequence_num) {
+ // OK if the port doesn't exist, as it may have been closed already.
+ scoped_refptr<Port> port = GetPort(port_name);
+ if (!port)
+ return OK;
+
+ // This message tells the port that it should no longer expect more messages
+ // beyond last_sequence_num. This message is forwarded along until we reach
+ // the receiving end, and this message serves as an equivalent to
+ // ObserveProxyAck.
+
+ bool notify_delegate = false;
+ {
+ // We must acquire |ports_lock_| before the port lock because it must be
+ // held for MaybeRemoveProxy_Locked.
+ base::AutoLock ports_lock(ports_lock_);
+
+ base::AutoLock lock(port->lock);
+
+ port->peer_closed = true;
+ port->last_sequence_num_to_receive = last_sequence_num;
+
+ DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
+ << " (state=" << port->state << ") pointing to "
+ << port->peer_port_name << "@" << port->peer_node_name
+ << " (last_sequence_num=" << last_sequence_num << ")";
+
+ // We always forward ObserveClosure, even beyond the receiving port which
+ // cares about it. This ensures that any dead-end proxies beyond that port
+ // are notified to remove themselves.
+
+ ObserveClosureEventData forwarded_data;
+
+ if (port->state == Port::kReceiving) {
+ notify_delegate = true;
+
+ // When forwarding along the other half of the port cycle, this will only
+ // reach dead-end proxies. Tell them we've sent our last message so they
+ // can go away.
+ //
+ // TODO: Repurposing ObserveClosure for this has the desired result but
+ // may be semantically confusing since the forwarding port is not actually
+ // closed. Consider replacing this with a new event type.
+ forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
+ } else {
+ // We haven't yet reached the receiving peer of the closed port, so
+ // forward the message along as-is.
+ forwarded_data.last_sequence_num = last_sequence_num;
+
+ // See about removing the port if it is a proxy as our peer won't be able
+ // to participate in proxy removal.
+ port->remove_proxy_on_last_message = true;
+ if (port->state == Port::kProxying)
+ MaybeRemoveProxy_Locked(port.get(), port_name);
+ }
+
+ DVLOG(2) << "Forwarding ObserveClosure from "
+ << port_name << "@" << name_ << " to peer "
+ << port->peer_port_name << "@" << port->peer_node_name
+ << " (last_sequence_num=" << forwarded_data.last_sequence_num
+ << ")";
+
+ delegate_->ForwardMessage(
+ port->peer_node_name,
+ NewInternalMessage(port->peer_port_name,
+ EventType::kObserveClosure, forwarded_data));
+ }
+ if (notify_delegate) {
+ PortRef port_ref(port_name, port);
+ delegate_->PortStatusChanged(port_ref);
+ }
+ return OK;
+}
+
+int Node::AddPortWithName(const PortName& port_name,
+ const scoped_refptr<Port>& port) {
+ base::AutoLock lock(ports_lock_);
+
+ if (!ports_.insert(std::make_pair(port_name, port)).second)
+ return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
+
+ DVLOG(2) << "Created port " << port_name << "@" << name_;
+ return OK;
+}
+
+void Node::ErasePort(const PortName& port_name) {
+ base::AutoLock lock(ports_lock_);
+ return ErasePort_Locked(port_name);
+}
+
+void Node::ErasePort_Locked(const PortName& port_name) {
+ ports_lock_.AssertAcquired();
+ ports_.erase(port_name);
+ DVLOG(2) << "Deleted port " << port_name << "@" << name_;
+}
+
+scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
+ base::AutoLock lock(ports_lock_);
+ return GetPort_Locked(port_name);
+}
+
+scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
+ ports_lock_.AssertAcquired();
+ auto iter = ports_.find(port_name);
+ if (iter == ports_.end())
+ return nullptr;
+
+ return iter->second;
+}
+
+void Node::WillSendPort_Locked(Port* port,
+ const NodeName& to_node_name,
+ PortName* port_name,
+ PortDescriptor* port_descriptor) {
+ ports_lock_.AssertAcquired();
+ port->lock.AssertAcquired();
+
+ PortName local_port_name = *port_name;
+
+ PortName new_port_name;
+ delegate_->GenerateRandomPortName(&new_port_name);
+
+ // Make sure we don't send messages to the new peer until after we know it
+ // exists. In the meantime, just buffer messages locally.
+ DCHECK(port->state == Port::kReceiving);
+ port->state = Port::kBuffering;
+
+ // If we already know our peer is closed, we already know this proxy can
+ // be removed once it receives and forwards its last expected message.
+ if (port->peer_closed)
+ port->remove_proxy_on_last_message = true;
+
+ *port_name = new_port_name;
+
+ port_descriptor->peer_node_name = port->peer_node_name;
+ port_descriptor->peer_port_name = port->peer_port_name;
+ port_descriptor->referring_node_name = name_;
+ port_descriptor->referring_port_name = local_port_name;
+ port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
+ port_descriptor->next_sequence_num_to_receive =
+ port->message_queue.next_sequence_num();
+ port_descriptor->last_sequence_num_to_receive =
+ port->last_sequence_num_to_receive;
+ port_descriptor->peer_closed = port->peer_closed;
+
+ // Configure the local port to point to the new port.
+ port->peer_node_name = to_node_name;
+ port->peer_port_name = new_port_name;
+}
+
+int Node::AcceptPort(const PortName& port_name,
+ const PortDescriptor& port_descriptor) {
+ scoped_refptr<Port> port = make_scoped_refptr(
+ new Port(port_descriptor.next_sequence_num_to_send,
+ port_descriptor.next_sequence_num_to_receive));
+ port->state = Port::kReceiving;
+ port->peer_node_name = port_descriptor.peer_node_name;
+ port->peer_port_name = port_descriptor.peer_port_name;
+ port->last_sequence_num_to_receive =
+ port_descriptor.last_sequence_num_to_receive;
+ port->peer_closed = port_descriptor.peer_closed;
+
+ DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
+ << port->peer_closed << "; last_sequence_num_to_receive="
+ << port->last_sequence_num_to_receive << "]";
+
+ // A newly accepted port is not signalable until the message referencing the
+ // new port finds its way to the consumer (see GetMessageIf).
+ port->message_queue.set_signalable(false);
+
+ int rv = AddPortWithName(port_name, port);
+ if (rv != OK)
+ return rv;
+
+ // Allow referring port to forward messages.
+ delegate_->ForwardMessage(
+ port_descriptor.referring_node_name,
+ NewInternalMessage(port_descriptor.referring_port_name,
+ EventType::kPortAccepted));
+ return OK;
+}
+
+int Node::WillSendMessage_Locked(Port* port,
+ const PortName& port_name,
+ Message* message) {
+ ports_lock_.AssertAcquired();
+ port->lock.AssertAcquired();
+
+ DCHECK(message);
+
+ // Messages may already have a sequence number if they're being forwarded
+ // by a proxy. Otherwise, use the next outgoing sequence number.
+ uint64_t* sequence_num =
+ &GetMutableEventData<UserEventData>(message)->sequence_num;
+ if (*sequence_num == 0)
+ *sequence_num = port->next_sequence_num_to_send++;
+
+#if !defined(NDEBUG)
+ std::ostringstream ports_buf;
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ if (i > 0)
+ ports_buf << ",";
+ ports_buf << message->ports()[i];
+ }
+#endif
+
+ if (message->num_ports() > 0) {
+ // Note: Another thread could be trying to send the same ports, so we need
+ // to ensure that they are ours to send before we mutate their state.
+
+ std::vector<scoped_refptr<Port>> ports;
+ ports.resize(message->num_ports());
+
+ {
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ ports[i] = GetPort_Locked(message->ports()[i]);
+ ports[i]->lock.Acquire();
+
+ int error = OK;
+ if (ports[i]->state != Port::kReceiving)
+ error = ERROR_PORT_STATE_UNEXPECTED;
+ else if (message->ports()[i] == port->peer_port_name)
+ error = ERROR_PORT_CANNOT_SEND_PEER;
+
+ if (error != OK) {
+ // Oops, we cannot send this port.
+ for (size_t j = 0; j <= i; ++j)
+ ports[i]->lock.Release();
+ // Backpedal on the sequence number.
+ port->next_sequence_num_to_send--;
+ return error;
+ }
+ }
+ }
+
+ PortDescriptor* port_descriptors =
+ GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
+
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ WillSendPort_Locked(ports[i].get(),
+ port->peer_node_name,
+ message->mutable_ports() + i,
+ port_descriptors + i);
+ }
+
+ for (size_t i = 0; i < message->num_ports(); ++i)
+ ports[i]->lock.Release();
+ }
+
+#if !defined(NDEBUG)
+ DVLOG(2) << "Sending message "
+ << GetEventData<UserEventData>(*message)->sequence_num
+ << " [ports=" << ports_buf.str() << "]"
+ << " from " << port_name << "@" << name_
+ << " to " << port->peer_port_name << "@" << port->peer_node_name;
+#endif
+
+ GetMutableEventHeader(message)->port_name = port->peer_port_name;
+ return OK;
+}
+
+int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
+ ports_lock_.AssertAcquired();
+ port->lock.AssertAcquired();
+
+ for (;;) {
+ ScopedMessage message;
+ port->message_queue.GetNextMessageIf(nullptr, &message);
+ if (!message)
+ break;
+
+ int rv = WillSendMessage_Locked(port, port_name, message.get());
+ if (rv != OK)
+ return rv;
+
+ delegate_->ForwardMessage(port->peer_node_name, std::move(message));
+ }
+ return OK;
+}
+
+void Node::InitiateProxyRemoval_Locked(Port* port,
+ const PortName& port_name) {
+ port->lock.AssertAcquired();
+
+ // To remove this node, we start by notifying the connected graph that we are
+ // a proxy. This allows whatever port is referencing this node to skip it.
+ // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
+ // the peer was closed in the meantime).
+
+ ObserveProxyEventData data;
+ data.proxy_node_name = name_;
+ data.proxy_port_name = port_name;
+ data.proxy_to_node_name = port->peer_node_name;
+ data.proxy_to_port_name = port->peer_port_name;
+
+ delegate_->ForwardMessage(
+ port->peer_node_name,
+ NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
+}
+
+void Node::MaybeRemoveProxy_Locked(Port* port,
+ const PortName& port_name) {
+ // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
+ ports_lock_.AssertAcquired();
+ port->lock.AssertAcquired();
+
+ DCHECK(port->state == Port::kProxying);
+
+ // Make sure we have seen ObserveProxyAck before removing the port.
+ if (!port->remove_proxy_on_last_message)
+ return;
+
+ if (!CanAcceptMoreMessages(port)) {
+ // This proxy port is done. We can now remove it!
+ ErasePort_Locked(port_name);
+
+ if (port->send_on_proxy_removal) {
+ NodeName to_node = port->send_on_proxy_removal->first;
+ ScopedMessage& message = port->send_on_proxy_removal->second;
+
+ delegate_->ForwardMessage(to_node, std::move(message));
+ }
+ } else {
+ DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
+ << " now; waiting for more messages";
+ }
+}
+
+ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
+ const EventType& type,
+ const void* data,
+ size_t num_data_bytes) {
+ ScopedMessage message;
+ delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
+
+ EventHeader* header = GetMutableEventHeader(message.get());
+ header->port_name = port_name;
+ header->type = type;
+ header->padding = 0;
+
+ if (num_data_bytes)
+ memcpy(header + 1, data, num_data_bytes);
+
+ return message;
+}
+
+} // namespace ports
+} // namespace edk
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698