| Index: mojo/edk/system/ports/node.cc
|
| diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..1718306fc672bab1f7e0660fd10fcaa732cbb0c1
|
| --- /dev/null
|
| +++ b/mojo/edk/system/ports/node.cc
|
| @@ -0,0 +1,998 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/edk/system/ports/node.h"
|
| +
|
| +#include <string.h>
|
| +
|
| +#include "base/logging.h"
|
| +#include "base/memory/ref_counted.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "mojo/edk/system/ports/node_delegate.h"
|
| +
|
| +namespace mojo {
|
| +namespace edk {
|
| +namespace ports {
|
| +
|
| +namespace {
|
| +
|
| +int DebugError(const char* message, int error_code) {
|
| + CHECK(false) << "Oops: " << message;
|
| + return error_code;
|
| +}
|
| +
|
| +#define OOPS(x) DebugError(#x, x)
|
| +
|
| +bool CanAcceptMoreMessages(const Port* port) {
|
| + // Have we already doled out the last message (i.e., do we expect to NOT
|
| + // receive further messages)?
|
| + uint64_t next_sequence_num = port->message_queue.next_sequence_num();
|
| + if (port->peer_closed || port->remove_proxy_on_last_message) {
|
| + if (port->last_sequence_num_to_receive == next_sequence_num - 1)
|
| + return false;
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +} // namespace
|
| +
|
| +Node::Node(const NodeName& name, NodeDelegate* delegate)
|
| + : name_(name),
|
| + delegate_(delegate) {
|
| +}
|
| +
|
| +Node::~Node() {
|
| + if (!ports_.empty())
|
| + DLOG(WARNING) << "Unclean shutdown for node " << name_;
|
| +}
|
| +
|
| +bool Node::CanShutdownCleanly(bool allow_local_ports) {
|
| + base::AutoLock ports_lock(ports_lock_);
|
| +
|
| + if (!allow_local_ports) {
|
| +#if !defined(NDEBUG)
|
| + for (auto entry : ports_) {
|
| + DVLOG(2) << "Port " << entry.first << " referencing node "
|
| + << entry.second->peer_node_name << " is blocking shutdown of "
|
| + << "node " << name_ << " (state=" << entry.second->state << ")";
|
| + }
|
| +#endif
|
| + return ports_.empty();
|
| + }
|
| +
|
| + // NOTE: This is not efficient, though it probably doesn't need to be since
|
| + // relatively few ports should be open during shutdown and shutdown doesn't
|
| + // need to be blazingly fast.
|
| + bool can_shutdown = true;
|
| + for (auto entry : ports_) {
|
| + base::AutoLock lock(entry.second->lock);
|
| + if (entry.second->peer_node_name != name_ &&
|
| + entry.second->state != Port::kReceiving) {
|
| + can_shutdown = false;
|
| +#if !defined(NDEBUG)
|
| + DVLOG(2) << "Port " << entry.first << " referencing node "
|
| + << entry.second->peer_node_name << " is blocking shutdown of "
|
| + << "node " << name_ << " (state=" << entry.second->state << ")";
|
| +#else
|
| + // Exit early when not debugging.
|
| + break;
|
| +#endif
|
| + }
|
| + }
|
| +
|
| + return can_shutdown;
|
| +}
|
| +
|
| +int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
|
| + scoped_refptr<Port> port = GetPort(port_name);
|
| + if (!port)
|
| + return ERROR_PORT_UNKNOWN;
|
| +
|
| + *port_ref = PortRef(port_name, std::move(port));
|
| + return OK;
|
| +}
|
| +
|
| +int Node::CreateUninitializedPort(PortRef* port_ref) {
|
| + PortName port_name;
|
| + delegate_->GenerateRandomPortName(&port_name);
|
| +
|
| + scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum,
|
| + kInitialSequenceNum));
|
| + int rv = AddPortWithName(port_name, port);
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + *port_ref = PortRef(port_name, std::move(port));
|
| + return OK;
|
| +}
|
| +
|
| +int Node::InitializePort(const PortRef& port_ref,
|
| + const NodeName& peer_node_name,
|
| + const PortName& peer_port_name) {
|
| + Port* port = port_ref.port();
|
| +
|
| + {
|
| + base::AutoLock lock(port->lock);
|
| + if (port->state != Port::kUninitialized)
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + port->state = Port::kReceiving;
|
| + port->peer_node_name = peer_node_name;
|
| + port->peer_port_name = peer_port_name;
|
| + }
|
| +
|
| + delegate_->PortStatusChanged(port_ref);
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
|
| + int rv;
|
| +
|
| + rv = CreateUninitializedPort(port0_ref);
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + rv = CreateUninitializedPort(port1_ref);
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + rv = InitializePort(*port0_ref, name_, port1_ref->name());
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + rv = InitializePort(*port1_ref, name_, port0_ref->name());
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int Node::SetUserData(const PortRef& port_ref,
|
| + const scoped_refptr<UserData>& user_data) {
|
| + Port* port = port_ref.port();
|
| +
|
| + base::AutoLock lock(port->lock);
|
| + if (port->state == Port::kClosed)
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + port->user_data = std::move(user_data);
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int Node::GetUserData(const PortRef& port_ref,
|
| + scoped_refptr<UserData>* user_data) {
|
| + Port* port = port_ref.port();
|
| +
|
| + base::AutoLock lock(port->lock);
|
| + if (port->state == Port::kClosed)
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + *user_data = port->user_data;
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int Node::ClosePort(const PortRef& port_ref) {
|
| + std::deque<PortName> referenced_port_names;
|
| +
|
| + ObserveClosureEventData data;
|
| +
|
| + NodeName peer_node_name;
|
| + PortName peer_port_name;
|
| + Port* port = port_ref.port();
|
| + {
|
| + // We may need to erase the port, which requires ports_lock_ to be held,
|
| + // but ports_lock_ must be acquired before any individual port locks.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| +
|
| + base::AutoLock lock(port->lock);
|
| + if (port->state == Port::kUninitialized) {
|
| + // If the port was not yet initialized, there's nothing interesting to do.
|
| + ErasePort_Locked(port_ref.name());
|
| + return OK;
|
| + }
|
| +
|
| + if (port->state != Port::kReceiving)
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + port->state = Port::kClosed;
|
| +
|
| + // We pass along the sequence number of the last message sent from this
|
| + // port to allow the peer to have the opportunity to consume all inbound
|
| + // messages before notifying the embedder that this port is closed.
|
| + data.last_sequence_num = port->next_sequence_num_to_send - 1;
|
| +
|
| + peer_node_name = port->peer_node_name;
|
| + peer_port_name = port->peer_port_name;
|
| +
|
| + // If the port being closed still has unread messages, then we need to take
|
| + // care to close those ports so as to avoid leaking memory.
|
| + port->message_queue.GetReferencedPorts(&referenced_port_names);
|
| + }
|
| +
|
| + DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
|
| + << " to " << peer_port_name << "@" << peer_node_name;
|
| +
|
| + ErasePort(port_ref.name());
|
| +
|
| + delegate_->ForwardMessage(
|
| + peer_node_name,
|
| + NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
|
| +
|
| + for (const auto& name : referenced_port_names) {
|
| + PortRef ref;
|
| + if (GetPort(name, &ref) == OK)
|
| + ClosePort(ref);
|
| + }
|
| + return OK;
|
| +}
|
| +
|
| +int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
|
| + Port* port = port_ref.port();
|
| +
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + if (port->state != Port::kReceiving)
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + port_status->has_messages = port->message_queue.HasNextMessage();
|
| + port_status->receiving_messages = CanAcceptMoreMessages(port);
|
| + port_status->peer_closed = port->peer_closed;
|
| + return OK;
|
| +}
|
| +
|
| +int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) {
|
| + return GetMessageIf(port_ref, nullptr, message);
|
| +}
|
| +
|
| +int Node::GetMessageIf(const PortRef& port_ref,
|
| + std::function<bool(const Message&)> selector,
|
| + ScopedMessage* message) {
|
| + *message = nullptr;
|
| +
|
| + DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_;
|
| +
|
| + Port* port = port_ref.port();
|
| + {
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + // This could also be treated like the port being unknown since the
|
| + // embedder should no longer be referring to a port that has been sent.
|
| + if (port->state != Port::kReceiving)
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + // Let the embedder get messages until there are no more before reporting
|
| + // that the peer closed its end.
|
| + if (!CanAcceptMoreMessages(port))
|
| + return ERROR_PORT_PEER_CLOSED;
|
| +
|
| + port->message_queue.GetNextMessageIf(selector, message);
|
| + }
|
| +
|
| + // Allow referenced ports to trigger PortStatusChanged calls.
|
| + if (*message) {
|
| + for (size_t i = 0; i < (*message)->num_ports(); ++i) {
|
| + const PortName& new_port_name = (*message)->ports()[i];
|
| + scoped_refptr<Port> new_port = GetPort(new_port_name);
|
| +
|
| + DCHECK(new_port) << "Port " << new_port_name << "@" << name_
|
| + << " does not exist!";
|
| +
|
| + base::AutoLock lock(new_port->lock);
|
| +
|
| + DCHECK(new_port->state == Port::kReceiving);
|
| + new_port->message_queue.set_signalable(true);
|
| + }
|
| + }
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int Node::SendMessage(const PortRef& port_ref, ScopedMessage* message) {
|
| + ScopedMessage& m = *message;
|
| + for (size_t i = 0; i < m->num_ports(); ++i) {
|
| + if (m->ports()[i] == port_ref.name())
|
| + return ERROR_PORT_CANNOT_SEND_SELF;
|
| + }
|
| +
|
| + Port* port = port_ref.port();
|
| + {
|
| + // We must acquire |ports_lock_| before grabbing any port locks, because
|
| + // WillSendMessage_Locked may need to lock multiple ports out of order.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + if (port->state != Port::kReceiving)
|
| + return ERROR_PORT_STATE_UNEXPECTED;
|
| +
|
| + if (port->peer_closed)
|
| + return ERROR_PORT_PEER_CLOSED;
|
| +
|
| + int rv = WillSendMessage_Locked(port, port_ref.name(), m.get());
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + // Beyond this point there's no sense in returning anything but OK. Even if
|
| + // message forwarding or acceptance fails, there's nothing the embedder can
|
| + // do to recover. Assume that failure beyond this point must be treated as a
|
| + // transport failure.
|
| +
|
| + if (port->peer_node_name != name_) {
|
| + delegate_->ForwardMessage(port->peer_node_name, std::move(m));
|
| + return OK;
|
| + }
|
| + }
|
| +
|
| + int rv = AcceptMessage(std::move(m));
|
| + if (rv != OK) {
|
| + // See comment above for why we don't return an error in this case.
|
| + DVLOG(2) << "AcceptMessage failed: " << rv;
|
| + }
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int Node::AcceptMessage(ScopedMessage message) {
|
| + const EventHeader* header = GetEventHeader(*message);
|
| + switch (header->type) {
|
| + case EventType::kUser:
|
| + return OnUserMessage(std::move(message));
|
| + case EventType::kPortAccepted:
|
| + return OnPortAccepted(header->port_name);
|
| + case EventType::kObserveProxy:
|
| + return OnObserveProxy(
|
| + header->port_name,
|
| + *GetEventData<ObserveProxyEventData>(*message));
|
| + case EventType::kObserveProxyAck:
|
| + return OnObserveProxyAck(
|
| + header->port_name,
|
| + GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
|
| + case EventType::kObserveClosure:
|
| + return OnObserveClosure(
|
| + header->port_name,
|
| + GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
|
| + }
|
| + return OOPS(ERROR_NOT_IMPLEMENTED);
|
| +}
|
| +
|
| +int Node::LostConnectionToNode(const NodeName& node_name) {
|
| + // We can no longer send events to the given node. We also can't expect any
|
| + // PortAccepted events.
|
| +
|
| + DVLOG(1) << "Observing lost connection from node " << name_
|
| + << " to node " << node_name;
|
| +
|
| + std::vector<PortRef> ports_to_notify;
|
| +
|
| + {
|
| + base::AutoLock ports_lock(ports_lock_);
|
| +
|
| + for (auto iter = ports_.begin(); iter != ports_.end(); ) {
|
| + scoped_refptr<Port>& port = iter->second;
|
| +
|
| + bool remove_port = false;
|
| + {
|
| + base::AutoLock port_lock(port->lock);
|
| +
|
| + if (port->peer_node_name == node_name) {
|
| + // We can no longer send messages to this port's peer. We assume we
|
| + // will not receive any more messages from this port's peer as well.
|
| + if (!port->peer_closed) {
|
| + port->peer_closed = true;
|
| + port->last_sequence_num_to_receive =
|
| + port->message_queue.next_sequence_num() - 1;
|
| +
|
| + if (port->state == Port::kReceiving)
|
| + ports_to_notify.push_back(PortRef(iter->first, port));
|
| + }
|
| +
|
| + // We do not expect to forward any further messages, and we do not
|
| + // expect to receive a Port{Accepted,Rejected} event.
|
| + if (port->state != Port::kReceiving)
|
| + remove_port = true;
|
| + }
|
| + }
|
| +
|
| + if (remove_port) {
|
| + DVLOG(2) << "Deleted port " << iter->first << "@" << name_;
|
| + iter = ports_.erase(iter);
|
| + } else {
|
| + ++iter;
|
| + }
|
| + }
|
| + }
|
| +
|
| + for (size_t i = 0; i < ports_to_notify.size(); ++i)
|
| + delegate_->PortStatusChanged(ports_to_notify[i]);
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int Node::OnUserMessage(ScopedMessage message) {
|
| + PortName port_name = GetEventHeader(*message)->port_name;
|
| + const auto* event = GetEventData<UserEventData>(*message);
|
| +
|
| +#if !defined(NDEBUG)
|
| + std::ostringstream ports_buf;
|
| + for (size_t i = 0; i < message->num_ports(); ++i) {
|
| + if (i > 0)
|
| + ports_buf << ",";
|
| + ports_buf << message->ports()[i];
|
| + }
|
| +
|
| + DVLOG(2) << "AcceptMessage " << event->sequence_num
|
| + << " [ports=" << ports_buf.str() << "] at "
|
| + << port_name << "@" << name_;
|
| +#endif
|
| +
|
| + scoped_refptr<Port> port = GetPort(port_name);
|
| +
|
| + // Even if this port does not exist, cannot receive anymore messages or is
|
| + // buffering or proxying messages, we still need these ports to be bound to
|
| + // this node. When the message is forwarded, these ports will get transferred
|
| + // following the usual method. If the message cannot be accepted, then the
|
| + // newly bound ports will simply be closed.
|
| +
|
| + for (size_t i = 0; i < message->num_ports(); ++i) {
|
| + int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
|
| + if (rv != OK)
|
| + return rv;
|
| + }
|
| +
|
| + bool has_next_message = false;
|
| + bool message_accepted = false;
|
| +
|
| + if (port) {
|
| + // We may want to forward messages once the port lock is held, so we must
|
| + // acquire |ports_lock_| first.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + // Reject spurious messages if we've already received the last expected
|
| + // message.
|
| + if (CanAcceptMoreMessages(port.get())) {
|
| + message_accepted = true;
|
| + port->message_queue.AcceptMessage(std::move(message), &has_next_message);
|
| +
|
| + if (port->state == Port::kBuffering) {
|
| + has_next_message = false;
|
| + } else if (port->state == Port::kProxying) {
|
| + has_next_message = false;
|
| +
|
| + // Forward messages. We forward messages in sequential order here so
|
| + // that we maintain the message queue's notion of next sequence number.
|
| + // That's useful for the proxy removal process as we can tell when this
|
| + // port has seen all of the messages it is expected to see.
|
| + int rv = ForwardMessages_Locked(port.get(), port_name);
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + MaybeRemoveProxy_Locked(port.get(), port_name);
|
| + }
|
| + }
|
| + }
|
| +
|
| + if (!message_accepted) {
|
| + DVLOG(2) << "Message not accepted!\n";
|
| + // Close all newly accepted ports as they are effectively orphaned.
|
| + for (size_t i = 0; i < message->num_ports(); ++i) {
|
| + PortRef port_ref;
|
| + if (GetPort(message->ports()[i], &port_ref) == OK) {
|
| + ClosePort(port_ref);
|
| + } else {
|
| + DLOG(WARNING) << "Cannot close non-existent port!\n";
|
| + }
|
| + }
|
| + } else if (has_next_message) {
|
| + PortRef port_ref(port_name, port);
|
| + delegate_->PortStatusChanged(port_ref);
|
| + }
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int Node::OnPortAccepted(const PortName& port_name) {
|
| + scoped_refptr<Port> port = GetPort(port_name);
|
| + if (!port)
|
| + return OOPS(ERROR_PORT_UNKNOWN);
|
| +
|
| + {
|
| + // We must hold |ports_lock_| before grabbing the port lock because
|
| + // ForwardMessages_Locked requires it to be held.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
|
| + << " pointing to "
|
| + << port->peer_port_name << "@" << port->peer_node_name;
|
| +
|
| + if (port->state != Port::kBuffering)
|
| + return OOPS(ERROR_PORT_STATE_UNEXPECTED);
|
| +
|
| + port->state = Port::kProxying;
|
| +
|
| + int rv = ForwardMessages_Locked(port.get(), port_name);
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + // We may have observed closure before receiving PortAccepted. In that
|
| + // case, we can advance to removing the proxy without sending out an
|
| + // ObserveProxy message. We already know the last expected message, etc.
|
| +
|
| + if (port->remove_proxy_on_last_message) {
|
| + MaybeRemoveProxy_Locked(port.get(), port_name);
|
| +
|
| + // Make sure we propagate closure to our current peer.
|
| + ObserveClosureEventData data;
|
| + data.last_sequence_num = port->last_sequence_num_to_receive;
|
| + delegate_->ForwardMessage(
|
| + port->peer_node_name,
|
| + NewInternalMessage(port->peer_port_name,
|
| + EventType::kObserveClosure, data));
|
| + } else {
|
| + InitiateProxyRemoval_Locked(port.get(), port_name);
|
| + }
|
| + }
|
| + return OK;
|
| +}
|
| +
|
| +int Node::OnObserveProxy(const PortName& port_name,
|
| + const ObserveProxyEventData& event) {
|
| + // The port may have already been closed locally, in which case the
|
| + // ObserveClosure message will contain the last_sequence_num field.
|
| + // We can then silently ignore this message.
|
| + scoped_refptr<Port> port = GetPort(port_name);
|
| + if (!port) {
|
| + DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
|
| + return OK;
|
| + }
|
| +
|
| + DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
|
| + << event.proxy_port_name << "@"
|
| + << event.proxy_node_name << " pointing to "
|
| + << event.proxy_to_port_name << "@"
|
| + << event.proxy_to_node_name;
|
| +
|
| + {
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + if (port->peer_node_name == event.proxy_node_name &&
|
| + port->peer_port_name == event.proxy_port_name) {
|
| + if (port->state == Port::kReceiving) {
|
| + port->peer_node_name = event.proxy_to_node_name;
|
| + port->peer_port_name = event.proxy_to_port_name;
|
| +
|
| + ObserveProxyAckEventData ack;
|
| + ack.last_sequence_num = port->next_sequence_num_to_send - 1;
|
| +
|
| + delegate_->ForwardMessage(
|
| + event.proxy_node_name,
|
| + NewInternalMessage(event.proxy_port_name,
|
| + EventType::kObserveProxyAck,
|
| + ack));
|
| + } else {
|
| + // As a proxy ourselves, we don't know how to honor the ObserveProxy
|
| + // event or to populate the last_sequence_num field of ObserveProxyAck.
|
| + // Afterall, another port could be sending messages to our peer now
|
| + // that we've sent out our own ObserveProxy event. Instead, we will
|
| + // send an ObserveProxyAck indicating that the ObserveProxy event
|
| + // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
|
| + // However, this has to be done after we are removed as a proxy.
|
| + // Otherwise, we might just find ourselves back here again, which
|
| + // would be akin to a busy loop.
|
| +
|
| + DVLOG(2) << "Delaying ObserveProxyAck to "
|
| + << event.proxy_port_name << "@" << event.proxy_node_name;
|
| +
|
| + ObserveProxyAckEventData ack;
|
| + ack.last_sequence_num = kInvalidSequenceNum;
|
| +
|
| + port->send_on_proxy_removal.reset(
|
| + new std::pair<NodeName, ScopedMessage>(
|
| + event.proxy_node_name,
|
| + NewInternalMessage(event.proxy_port_name,
|
| + EventType::kObserveProxyAck,
|
| + ack)));
|
| + }
|
| + } else {
|
| + // Forward this event along to our peer. Eventually, it should find the
|
| + // port referring to the proxy.
|
| + delegate_->ForwardMessage(
|
| + port->peer_node_name,
|
| + NewInternalMessage(port->peer_port_name,
|
| + EventType::kObserveProxy,
|
| + event));
|
| + }
|
| + }
|
| + return OK;
|
| +}
|
| +
|
| +int Node::OnObserveProxyAck(const PortName& port_name,
|
| + uint64_t last_sequence_num) {
|
| + DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
|
| + << " (last_sequence_num=" << last_sequence_num << ")";
|
| +
|
| + scoped_refptr<Port> port = GetPort(port_name);
|
| + if (!port)
|
| + return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so
|
| + // this is not an "Oops".
|
| +
|
| + {
|
| + // We must acquire |ports_lock_| before the port lock because it must be
|
| + // held for MaybeRemoveProxy_Locked.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| +
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + if (port->state != Port::kProxying)
|
| + return OOPS(ERROR_PORT_STATE_UNEXPECTED);
|
| +
|
| + if (last_sequence_num == kInvalidSequenceNum) {
|
| + // Send again.
|
| + InitiateProxyRemoval_Locked(port.get(), port_name);
|
| + return OK;
|
| + }
|
| +
|
| + // We can now remove this port once we have received and forwarded the last
|
| + // message addressed to this port.
|
| + port->remove_proxy_on_last_message = true;
|
| + port->last_sequence_num_to_receive = last_sequence_num;
|
| +
|
| + MaybeRemoveProxy_Locked(port.get(), port_name);
|
| + }
|
| + return OK;
|
| +}
|
| +
|
| +int Node::OnObserveClosure(const PortName& port_name,
|
| + uint64_t last_sequence_num) {
|
| + // OK if the port doesn't exist, as it may have been closed already.
|
| + scoped_refptr<Port> port = GetPort(port_name);
|
| + if (!port)
|
| + return OK;
|
| +
|
| + // This message tells the port that it should no longer expect more messages
|
| + // beyond last_sequence_num. This message is forwarded along until we reach
|
| + // the receiving end, and this message serves as an equivalent to
|
| + // ObserveProxyAck.
|
| +
|
| + bool notify_delegate = false;
|
| + {
|
| + // We must acquire |ports_lock_| before the port lock because it must be
|
| + // held for MaybeRemoveProxy_Locked.
|
| + base::AutoLock ports_lock(ports_lock_);
|
| +
|
| + base::AutoLock lock(port->lock);
|
| +
|
| + port->peer_closed = true;
|
| + port->last_sequence_num_to_receive = last_sequence_num;
|
| +
|
| + DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
|
| + << " (state=" << port->state << ") pointing to "
|
| + << port->peer_port_name << "@" << port->peer_node_name
|
| + << " (last_sequence_num=" << last_sequence_num << ")";
|
| +
|
| + // We always forward ObserveClosure, even beyond the receiving port which
|
| + // cares about it. This ensures that any dead-end proxies beyond that port
|
| + // are notified to remove themselves.
|
| +
|
| + ObserveClosureEventData forwarded_data;
|
| +
|
| + if (port->state == Port::kReceiving) {
|
| + notify_delegate = true;
|
| +
|
| + // When forwarding along the other half of the port cycle, this will only
|
| + // reach dead-end proxies. Tell them we've sent our last message so they
|
| + // can go away.
|
| + //
|
| + // TODO: Repurposing ObserveClosure for this has the desired result but
|
| + // may be semantically confusing since the forwarding port is not actually
|
| + // closed. Consider replacing this with a new event type.
|
| + forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
|
| + } else {
|
| + // We haven't yet reached the receiving peer of the closed port, so
|
| + // forward the message along as-is.
|
| + forwarded_data.last_sequence_num = last_sequence_num;
|
| +
|
| + // See about removing the port if it is a proxy as our peer won't be able
|
| + // to participate in proxy removal.
|
| + port->remove_proxy_on_last_message = true;
|
| + if (port->state == Port::kProxying)
|
| + MaybeRemoveProxy_Locked(port.get(), port_name);
|
| + }
|
| +
|
| + DVLOG(2) << "Forwarding ObserveClosure from "
|
| + << port_name << "@" << name_ << " to peer "
|
| + << port->peer_port_name << "@" << port->peer_node_name
|
| + << " (last_sequence_num=" << forwarded_data.last_sequence_num
|
| + << ")";
|
| +
|
| + delegate_->ForwardMessage(
|
| + port->peer_node_name,
|
| + NewInternalMessage(port->peer_port_name,
|
| + EventType::kObserveClosure, forwarded_data));
|
| + }
|
| + if (notify_delegate) {
|
| + PortRef port_ref(port_name, port);
|
| + delegate_->PortStatusChanged(port_ref);
|
| + }
|
| + return OK;
|
| +}
|
| +
|
| +int Node::AddPortWithName(const PortName& port_name,
|
| + const scoped_refptr<Port>& port) {
|
| + base::AutoLock lock(ports_lock_);
|
| +
|
| + if (!ports_.insert(std::make_pair(port_name, port)).second)
|
| + return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
|
| +
|
| + DVLOG(2) << "Created port " << port_name << "@" << name_;
|
| + return OK;
|
| +}
|
| +
|
| +void Node::ErasePort(const PortName& port_name) {
|
| + base::AutoLock lock(ports_lock_);
|
| + return ErasePort_Locked(port_name);
|
| +}
|
| +
|
| +void Node::ErasePort_Locked(const PortName& port_name) {
|
| + ports_lock_.AssertAcquired();
|
| + ports_.erase(port_name);
|
| + DVLOG(2) << "Deleted port " << port_name << "@" << name_;
|
| +}
|
| +
|
| +scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
|
| + base::AutoLock lock(ports_lock_);
|
| + return GetPort_Locked(port_name);
|
| +}
|
| +
|
| +scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
|
| + ports_lock_.AssertAcquired();
|
| + auto iter = ports_.find(port_name);
|
| + if (iter == ports_.end())
|
| + return nullptr;
|
| +
|
| + return iter->second;
|
| +}
|
| +
|
| +void Node::WillSendPort_Locked(Port* port,
|
| + const NodeName& to_node_name,
|
| + PortName* port_name,
|
| + PortDescriptor* port_descriptor) {
|
| + ports_lock_.AssertAcquired();
|
| + port->lock.AssertAcquired();
|
| +
|
| + PortName local_port_name = *port_name;
|
| +
|
| + PortName new_port_name;
|
| + delegate_->GenerateRandomPortName(&new_port_name);
|
| +
|
| + // Make sure we don't send messages to the new peer until after we know it
|
| + // exists. In the meantime, just buffer messages locally.
|
| + DCHECK(port->state == Port::kReceiving);
|
| + port->state = Port::kBuffering;
|
| +
|
| + // If we already know our peer is closed, we already know this proxy can
|
| + // be removed once it receives and forwards its last expected message.
|
| + if (port->peer_closed)
|
| + port->remove_proxy_on_last_message = true;
|
| +
|
| + *port_name = new_port_name;
|
| +
|
| + port_descriptor->peer_node_name = port->peer_node_name;
|
| + port_descriptor->peer_port_name = port->peer_port_name;
|
| + port_descriptor->referring_node_name = name_;
|
| + port_descriptor->referring_port_name = local_port_name;
|
| + port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
|
| + port_descriptor->next_sequence_num_to_receive =
|
| + port->message_queue.next_sequence_num();
|
| + port_descriptor->last_sequence_num_to_receive =
|
| + port->last_sequence_num_to_receive;
|
| + port_descriptor->peer_closed = port->peer_closed;
|
| +
|
| + // Configure the local port to point to the new port.
|
| + port->peer_node_name = to_node_name;
|
| + port->peer_port_name = new_port_name;
|
| +}
|
| +
|
| +int Node::AcceptPort(const PortName& port_name,
|
| + const PortDescriptor& port_descriptor) {
|
| + scoped_refptr<Port> port = make_scoped_refptr(
|
| + new Port(port_descriptor.next_sequence_num_to_send,
|
| + port_descriptor.next_sequence_num_to_receive));
|
| + port->state = Port::kReceiving;
|
| + port->peer_node_name = port_descriptor.peer_node_name;
|
| + port->peer_port_name = port_descriptor.peer_port_name;
|
| + port->last_sequence_num_to_receive =
|
| + port_descriptor.last_sequence_num_to_receive;
|
| + port->peer_closed = port_descriptor.peer_closed;
|
| +
|
| + DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
|
| + << port->peer_closed << "; last_sequence_num_to_receive="
|
| + << port->last_sequence_num_to_receive << "]";
|
| +
|
| + // A newly accepted port is not signalable until the message referencing the
|
| + // new port finds its way to the consumer (see GetMessageIf).
|
| + port->message_queue.set_signalable(false);
|
| +
|
| + int rv = AddPortWithName(port_name, port);
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + // Allow referring port to forward messages.
|
| + delegate_->ForwardMessage(
|
| + port_descriptor.referring_node_name,
|
| + NewInternalMessage(port_descriptor.referring_port_name,
|
| + EventType::kPortAccepted));
|
| + return OK;
|
| +}
|
| +
|
| +int Node::WillSendMessage_Locked(Port* port,
|
| + const PortName& port_name,
|
| + Message* message) {
|
| + ports_lock_.AssertAcquired();
|
| + port->lock.AssertAcquired();
|
| +
|
| + DCHECK(message);
|
| +
|
| + // Messages may already have a sequence number if they're being forwarded
|
| + // by a proxy. Otherwise, use the next outgoing sequence number.
|
| + uint64_t* sequence_num =
|
| + &GetMutableEventData<UserEventData>(message)->sequence_num;
|
| + if (*sequence_num == 0)
|
| + *sequence_num = port->next_sequence_num_to_send++;
|
| +
|
| +#if !defined(NDEBUG)
|
| + std::ostringstream ports_buf;
|
| + for (size_t i = 0; i < message->num_ports(); ++i) {
|
| + if (i > 0)
|
| + ports_buf << ",";
|
| + ports_buf << message->ports()[i];
|
| + }
|
| +#endif
|
| +
|
| + if (message->num_ports() > 0) {
|
| + // Note: Another thread could be trying to send the same ports, so we need
|
| + // to ensure that they are ours to send before we mutate their state.
|
| +
|
| + std::vector<scoped_refptr<Port>> ports;
|
| + ports.resize(message->num_ports());
|
| +
|
| + {
|
| + for (size_t i = 0; i < message->num_ports(); ++i) {
|
| + ports[i] = GetPort_Locked(message->ports()[i]);
|
| + ports[i]->lock.Acquire();
|
| +
|
| + int error = OK;
|
| + if (ports[i]->state != Port::kReceiving)
|
| + error = ERROR_PORT_STATE_UNEXPECTED;
|
| + else if (message->ports()[i] == port->peer_port_name)
|
| + error = ERROR_PORT_CANNOT_SEND_PEER;
|
| +
|
| + if (error != OK) {
|
| + // Oops, we cannot send this port.
|
| + for (size_t j = 0; j <= i; ++j)
|
| + ports[i]->lock.Release();
|
| + // Backpedal on the sequence number.
|
| + port->next_sequence_num_to_send--;
|
| + return error;
|
| + }
|
| + }
|
| + }
|
| +
|
| + PortDescriptor* port_descriptors =
|
| + GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
|
| +
|
| + for (size_t i = 0; i < message->num_ports(); ++i) {
|
| + WillSendPort_Locked(ports[i].get(),
|
| + port->peer_node_name,
|
| + message->mutable_ports() + i,
|
| + port_descriptors + i);
|
| + }
|
| +
|
| + for (size_t i = 0; i < message->num_ports(); ++i)
|
| + ports[i]->lock.Release();
|
| + }
|
| +
|
| +#if !defined(NDEBUG)
|
| + DVLOG(2) << "Sending message "
|
| + << GetEventData<UserEventData>(*message)->sequence_num
|
| + << " [ports=" << ports_buf.str() << "]"
|
| + << " from " << port_name << "@" << name_
|
| + << " to " << port->peer_port_name << "@" << port->peer_node_name;
|
| +#endif
|
| +
|
| + GetMutableEventHeader(message)->port_name = port->peer_port_name;
|
| + return OK;
|
| +}
|
| +
|
| +int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
|
| + ports_lock_.AssertAcquired();
|
| + port->lock.AssertAcquired();
|
| +
|
| + for (;;) {
|
| + ScopedMessage message;
|
| + port->message_queue.GetNextMessageIf(nullptr, &message);
|
| + if (!message)
|
| + break;
|
| +
|
| + int rv = WillSendMessage_Locked(port, port_name, message.get());
|
| + if (rv != OK)
|
| + return rv;
|
| +
|
| + delegate_->ForwardMessage(port->peer_node_name, std::move(message));
|
| + }
|
| + return OK;
|
| +}
|
| +
|
| +void Node::InitiateProxyRemoval_Locked(Port* port,
|
| + const PortName& port_name) {
|
| + port->lock.AssertAcquired();
|
| +
|
| + // To remove this node, we start by notifying the connected graph that we are
|
| + // a proxy. This allows whatever port is referencing this node to skip it.
|
| + // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
|
| + // the peer was closed in the meantime).
|
| +
|
| + ObserveProxyEventData data;
|
| + data.proxy_node_name = name_;
|
| + data.proxy_port_name = port_name;
|
| + data.proxy_to_node_name = port->peer_node_name;
|
| + data.proxy_to_port_name = port->peer_port_name;
|
| +
|
| + delegate_->ForwardMessage(
|
| + port->peer_node_name,
|
| + NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
|
| +}
|
| +
|
| +void Node::MaybeRemoveProxy_Locked(Port* port,
|
| + const PortName& port_name) {
|
| + // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
|
| + ports_lock_.AssertAcquired();
|
| + port->lock.AssertAcquired();
|
| +
|
| + DCHECK(port->state == Port::kProxying);
|
| +
|
| + // Make sure we have seen ObserveProxyAck before removing the port.
|
| + if (!port->remove_proxy_on_last_message)
|
| + return;
|
| +
|
| + if (!CanAcceptMoreMessages(port)) {
|
| + // This proxy port is done. We can now remove it!
|
| + ErasePort_Locked(port_name);
|
| +
|
| + if (port->send_on_proxy_removal) {
|
| + NodeName to_node = port->send_on_proxy_removal->first;
|
| + ScopedMessage& message = port->send_on_proxy_removal->second;
|
| +
|
| + delegate_->ForwardMessage(to_node, std::move(message));
|
| + }
|
| + } else {
|
| + DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
|
| + << " now; waiting for more messages";
|
| + }
|
| +}
|
| +
|
| +ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
|
| + const EventType& type,
|
| + const void* data,
|
| + size_t num_data_bytes) {
|
| + ScopedMessage message;
|
| + delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
|
| +
|
| + EventHeader* header = GetMutableEventHeader(message.get());
|
| + header->port_name = port_name;
|
| + header->type = type;
|
| + header->padding = 0;
|
| +
|
| + if (num_data_bytes)
|
| + memcpy(header + 1, data, num_data_bytes);
|
| +
|
| + return message;
|
| +}
|
| +
|
| +} // namespace ports
|
| +} // namespace edk
|
| +} // namespace mojo
|
|
|