Index: mojo/edk/system/node_channel.cc |
diff --git a/mojo/edk/system/node_channel.cc b/mojo/edk/system/node_channel.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b8d42e94ff5226653d0973c021b3ba1b8963629c |
--- /dev/null |
+++ b/mojo/edk/system/node_channel.cc |
@@ -0,0 +1,417 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/edk/system/node_channel.h" |
+ |
+#include <cstring> |
+#include <limits> |
+#include <sstream> |
+ |
+#include "base/logging.h" |
+#include "mojo/edk/system/channel.h" |
+ |
+namespace mojo { |
+namespace edk { |
+ |
+namespace { |
+ |
+template <typename T> |
+T Align(T t) { |
+ const auto k = kChannelMessageAlignment; |
+ return t + (k - (t % k)) % k; |
+} |
+ |
+enum class MessageType : uint32_t { |
+ ACCEPT_CHILD, |
+ ACCEPT_PARENT, |
+ PORTS_MESSAGE, |
+ REQUEST_PORT_CONNECTION, |
+ CONNECT_TO_PORT, |
+ REQUEST_INTRODUCTION, |
+ INTRODUCE, |
+#if defined(OS_WIN) |
+ RELAY_PORTS_MESSAGE, |
+#endif |
+}; |
+ |
+struct Header { |
+ MessageType type; |
+ uint32_t padding; |
+}; |
+ |
+static_assert(sizeof(Header) % kChannelMessageAlignment == 0, |
+ "Invalid header size."); |
+ |
+struct AcceptChildData { |
+ ports::NodeName parent_name; |
+ ports::NodeName token; |
+}; |
+ |
+struct AcceptParentData { |
+ ports::NodeName token; |
+ ports::NodeName child_name; |
+}; |
+ |
+// This is followed by arbitrary payload data which is interpreted as a token |
+// string for port location. |
+struct RequestPortConnectionData { |
+ ports::PortName connector_port_name; |
+}; |
+ |
+struct ConnectToPortData { |
+ ports::PortName connector_port_name; |
+ ports::PortName connectee_port_name; |
+}; |
+ |
+// Used for both REQUEST_INTRODUCTION and INTRODUCE. |
+// |
+// For INTRODUCE the message must also include a platform handle the recipient |
+// can use to communicate with the named node. If said handle is omitted, the |
+// peer cannot be introduced. |
+struct IntroductionData { |
+ ports::NodeName name; |
+}; |
+ |
+#if defined(OS_WIN) |
+// This struct is followed by the full payload of a message to be relayed. |
+struct RelayPortsMessageData { |
+ ports::NodeName destination; |
+}; |
+#endif |
+ |
+template <typename DataType> |
+Channel::MessagePtr CreateMessage(MessageType type, |
+ size_t payload_size, |
+ size_t num_handles, |
+ DataType** out_data) { |
+ Channel::MessagePtr message( |
+ new Channel::Message(sizeof(Header) + payload_size, num_handles)); |
+ Header* header = reinterpret_cast<Header*>(message->mutable_payload()); |
+ header->type = type; |
+ header->padding = 0; |
+ *out_data = reinterpret_cast<DataType*>(&header[1]); |
+ return message; |
+}; |
+ |
+template <typename DataType> |
+void GetMessagePayload(const void* bytes, DataType** out_data) { |
+ *out_data = reinterpret_cast<const DataType*>( |
+ static_cast<const char*>(bytes) + sizeof(Header)); |
+} |
+ |
+} // namespace |
+ |
+// static |
+scoped_refptr<NodeChannel> NodeChannel::Create( |
+ Delegate* delegate, |
+ ScopedPlatformHandle platform_handle, |
+ scoped_refptr<base::TaskRunner> io_task_runner) { |
+ return new NodeChannel(delegate, std::move(platform_handle), io_task_runner); |
+} |
+ |
+// static |
+Channel::MessagePtr NodeChannel::CreatePortsMessage(size_t payload_size, |
+ void** payload, |
+ size_t num_handles) { |
+ return CreateMessage(MessageType::PORTS_MESSAGE, payload_size, num_handles, |
+ payload); |
+} |
+ |
+// static |
+void NodeChannel::GetPortsMessageData(Channel::Message* message, |
+ void** data, |
+ size_t* num_data_bytes) { |
+ *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1; |
+ *num_data_bytes = message->payload_size() - sizeof(Header); |
+} |
+ |
+void NodeChannel::Start() { |
+ base::AutoLock lock(channel_lock_); |
+ DCHECK(channel_); |
+ channel_->Start(); |
+} |
+ |
+void NodeChannel::ShutDown() { |
+ base::AutoLock lock(channel_lock_); |
+ if (channel_) { |
+ channel_->ShutDown(); |
+ channel_ = nullptr; |
+ } |
+} |
+ |
+void NodeChannel::SetRemoteProcessHandle(base::ProcessHandle process_handle) { |
+#if defined(OS_WIN) |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ base::AutoLock lock(remote_process_handle_lock_); |
+ remote_process_handle_ = process_handle; |
+#endif |
+} |
+ |
+void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) { |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ remote_node_name_ = name; |
+} |
+ |
+void NodeChannel::AcceptChild(const ports::NodeName& parent_name, |
+ const ports::NodeName& token) { |
+ AcceptChildData* data; |
+ Channel::MessagePtr message = CreateMessage( |
+ MessageType::ACCEPT_CHILD, sizeof(AcceptChildData), 0, &data); |
+ data->parent_name = parent_name; |
+ data->token = token; |
+ WriteChannelMessage(std::move(message)); |
+} |
+ |
+void NodeChannel::AcceptParent(const ports::NodeName& token, |
+ const ports::NodeName& child_name) { |
+ AcceptParentData* data; |
+ Channel::MessagePtr message = CreateMessage( |
+ MessageType::ACCEPT_PARENT, sizeof(AcceptParentData), 0, &data); |
+ data->token = token; |
+ data->child_name = child_name; |
+ WriteChannelMessage(std::move(message)); |
+} |
+ |
+void NodeChannel::PortsMessage(Channel::MessagePtr message) { |
+ WriteChannelMessage(std::move(message)); |
+} |
+ |
+void NodeChannel::RequestPortConnection( |
+ const ports::PortName& connector_port_name, |
+ const std::string& token) { |
+ RequestPortConnectionData* data; |
+ Channel::MessagePtr message = CreateMessage( |
+ MessageType::REQUEST_PORT_CONNECTION, |
+ sizeof(RequestPortConnectionData) + token.size(), 0, &data); |
+ data->connector_port_name = connector_port_name; |
+ memcpy(data + 1, token.data(), token.size()); |
+ WriteChannelMessage(std::move(message)); |
+} |
+ |
+void NodeChannel::ConnectToPort(const ports::PortName& connector_port_name, |
+ const ports::PortName& connectee_port_name) { |
+ ConnectToPortData* data; |
+ Channel::MessagePtr message = CreateMessage( |
+ MessageType::CONNECT_TO_PORT, sizeof(ConnectToPortData), 0, &data); |
+ data->connector_port_name = connector_port_name; |
+ data->connectee_port_name = connectee_port_name; |
+ WriteChannelMessage(std::move(message)); |
+} |
+ |
+void NodeChannel::RequestIntroduction(const ports::NodeName& name) { |
+ IntroductionData* data; |
+ Channel::MessagePtr message = CreateMessage( |
+ MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data); |
+ data->name = name; |
+ WriteChannelMessage(std::move(message)); |
+} |
+ |
+void NodeChannel::Introduce(const ports::NodeName& name, |
+ ScopedPlatformHandle handle) { |
+ IntroductionData* data; |
+ ScopedPlatformHandleVectorPtr handles; |
+ if (handle.is_valid()) { |
+ handles.reset(new PlatformHandleVector(1)); |
+ handles->at(0) = handle.release(); |
+ } |
+ Channel::MessagePtr message = CreateMessage( |
+ MessageType::INTRODUCE, sizeof(IntroductionData), handles ? 1 : 0, &data); |
+ message->SetHandles(std::move(handles)); |
+ data->name = name; |
+ WriteChannelMessage(std::move(message)); |
+} |
+ |
+#if defined(OS_WIN) |
+void NodeChannel::RelayPortsMessage(const ports::NodeName& destination, |
+ Channel::MessagePtr message) { |
+ DCHECK(message->has_handles()); |
+ |
+ // Note that this is only used on Windows, and on Windows all platform |
+ // handles are included in the message data. We blindly copy all the data |
+ // here and the relay node (the parent) will duplicate handles as needed. |
+ size_t num_bytes = sizeof(RelayPortsMessageData) + message->data_num_bytes(); |
+ RelayPortsMessageData* data; |
+ Channel::MessagePtr relay_message = CreateMessage( |
+ MessageType::RELAY_PORTS_MESSAGE, num_bytes, 0, &data); |
+ data->destination = destination; |
+ memcpy(data + 1, message->data(), message->data_num_bytes()); |
+ |
+ // When the handles are duplicated in the parent, the source handles will |
+ // be closed. If the parent never receives this message then these handles |
+ // will leak, but that means something else has probably broken and the |
+ // sending process won't likely be around much longer. |
+ ScopedPlatformHandleVectorPtr handles = message->TakeHandles(); |
+ handles->clear(); |
+ |
+ WriteChannelMessage(std::move(relay_message)); |
+} |
+#endif |
+ |
+NodeChannel::NodeChannel(Delegate* delegate, |
+ ScopedPlatformHandle platform_handle, |
+ scoped_refptr<base::TaskRunner> io_task_runner) |
+ : delegate_(delegate), |
+ io_task_runner_(io_task_runner), |
+ channel_( |
+ Channel::Create(this, std::move(platform_handle), io_task_runner_)) { |
+} |
+ |
+NodeChannel::~NodeChannel() { |
+ ShutDown(); |
+} |
+ |
+void NodeChannel::OnChannelMessage(const void* payload, |
+ size_t payload_size, |
+ ScopedPlatformHandleVectorPtr handles) { |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ const Header* header = static_cast<const Header*>(payload); |
+ switch (header->type) { |
+ case MessageType::ACCEPT_CHILD: { |
+ const AcceptChildData* data; |
+ GetMessagePayload(payload, &data); |
+ delegate_->OnAcceptChild(remote_node_name_, data->parent_name, |
+ data->token); |
+ break; |
+ } |
+ |
+ case MessageType::ACCEPT_PARENT: { |
+ const AcceptParentData* data; |
+ GetMessagePayload(payload, &data); |
+ delegate_->OnAcceptParent(remote_node_name_, data->token, |
+ data->child_name); |
+ break; |
+ } |
+ |
+ case MessageType::PORTS_MESSAGE: { |
+ size_t num_handles = handles ? handles->size() : 0; |
+ Channel::MessagePtr message( |
+ new Channel::Message(payload_size, num_handles)); |
+ message->SetHandles(std::move(handles)); |
+ memcpy(message->mutable_payload(), payload, payload_size); |
+ delegate_->OnPortsMessage(std::move(message)); |
+ break; |
+ } |
+ |
+ case MessageType::REQUEST_PORT_CONNECTION: { |
+ const RequestPortConnectionData* data; |
+ GetMessagePayload(payload, &data); |
+ |
+ const char* token_data = reinterpret_cast<const char*>(data + 1); |
+ const size_t token_size = payload_size - sizeof(*data) - sizeof(Header); |
+ std::string token(token_data, token_size); |
+ |
+ delegate_->OnRequestPortConnection(remote_node_name_, |
+ data->connector_port_name, token); |
+ break; |
+ } |
+ |
+ case MessageType::CONNECT_TO_PORT: { |
+ const ConnectToPortData* data; |
+ GetMessagePayload(payload, &data); |
+ delegate_->OnConnectToPort(remote_node_name_, data->connector_port_name, |
+ data->connectee_port_name); |
+ break; |
+ } |
+ |
+ case MessageType::REQUEST_INTRODUCTION: { |
+ const IntroductionData* data; |
+ GetMessagePayload(payload, &data); |
+ delegate_->OnRequestIntroduction(remote_node_name_, data->name); |
+ break; |
+ } |
+ |
+ case MessageType::INTRODUCE: { |
+ const IntroductionData* data; |
+ GetMessagePayload(payload, &data); |
+ ScopedPlatformHandle handle; |
+ if (handles && !handles->empty()) { |
+ handle = ScopedPlatformHandle(handles->at(0)); |
+ handles->clear(); |
+ } |
+ delegate_->OnIntroduce(remote_node_name_, data->name, std::move(handle)); |
+ break; |
+ } |
+ |
+#if defined(OS_WIN) |
+ case MessageType::RELAY_PORTS_MESSAGE: { |
+ base::ProcessHandle from_process; |
+ { |
+ base::AutoLock lock(remote_process_handle_lock_); |
+ from_process = remote_process_handle_; |
+ } |
+ const RelayPortsMessageData* data; |
+ GetMessagePayload(payload, &data); |
+ const void* message_start = data + 1; |
+ Channel::MessagePtr message = Channel::Message::Deserialize( |
+ message_start, payload_size - sizeof(Header) - sizeof(*data)); |
+ if (!message) { |
+ DLOG(ERROR) << "Dropping invalid relay message."; |
+ break; |
+ } |
+ delegate_->OnRelayPortsMessage(remote_node_name_, from_process, |
+ data->destination, std::move(message)); |
+ break; |
+ } |
+#endif |
+ |
+ default: |
+ DLOG(ERROR) << "Received unknown message type " |
+ << static_cast<uint32_t>(header->type) << " from node " |
+ << remote_node_name_; |
+ delegate_->OnChannelError(remote_node_name_); |
+ break; |
+ } |
+} |
+ |
+void NodeChannel::OnChannelError() { |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ ShutDown(); |
+ // |OnChannelError()| may cause |this| to be destroyed, but still need access |
+ // to the name name after that destruction. So may a copy of |
+ // |remote_node_name_| so it can be used if |this| becomes destroyed. |
+ ports::NodeName node_name = remote_node_name_; |
+ delegate_->OnChannelError(node_name); |
+} |
+ |
+void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) { |
+#if defined(OS_WIN) |
+ // Map handles to the destination process. Note: only messages from the parent |
+ // node should contain handles on Windows. If a child node needs to send |
+ // handles, it should do so via RelayPortsMessage, which stashes the handles |
+ // in the message in such a way that they go undetected here. |
+ |
+ if (message->has_handles()) { |
+ base::ProcessHandle remote_process_handle; |
+ { |
+ base::AutoLock lock(remote_process_handle_lock_); |
+ remote_process_handle = remote_process_handle_; |
+ } |
+ |
+ if (remote_process_handle == base::kNullProcessHandle) { |
+ DLOG(ERROR) << "Sending a message with handles as a non-parent. " |
+ << "This is most likely broken."; |
+ } else { |
+ for (size_t i = 0; i < message->num_handles(); ++i) { |
+ BOOL result = DuplicateHandle( |
+ base::GetCurrentProcessHandle(), message->handles()[i].handle, |
+ remote_process_handle, |
+ reinterpret_cast<HANDLE*>(message->handles() + i), 0, FALSE, |
+ DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); |
+ DCHECK(result); |
+ } |
+ } |
+ } |
+#endif |
+ |
+ base::AutoLock lock(channel_lock_); |
+ if (!channel_) |
+ DLOG(ERROR) << "Dropping message on closed channel."; |
+ else |
+ channel_->Write(std::move(message)); |
+} |
+ |
+} // namespace edk |
+} // namespace mojo |