Index: mojo/edk/system/node_channel.cc |
diff --git a/mojo/edk/system/node_channel.cc b/mojo/edk/system/node_channel.cc |
index 0e2ef52769603ffe4fbd63debf02e68bf257a3e1..7e76fe4cd514ff29ef892b463b1ef5281be4f342 100644 |
--- a/mojo/edk/system/node_channel.cc |
+++ b/mojo/edk/system/node_channel.cc |
@@ -8,9 +8,15 @@ |
#include <limits> |
#include <sstream> |
+#include "base/bind.h" |
+#include "base/location.h" |
#include "base/logging.h" |
#include "mojo/edk/system/channel.h" |
+#if defined(OS_MACOSX) && !defined(OS_IOS) |
+#include "mojo/edk/system/mach_port_relay.h" |
+#endif |
+ |
namespace mojo { |
namespace edk { |
@@ -32,7 +38,7 @@ enum class MessageType : uint32_t { |
REQUEST_PORT_MERGE, |
REQUEST_INTRODUCTION, |
INTRODUCE, |
-#if defined(OS_WIN) |
+#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
RELAY_PORTS_MESSAGE, |
#endif |
}; |
@@ -98,7 +104,7 @@ struct IntroductionData { |
ports::NodeName name; |
}; |
-#if defined(OS_WIN) |
+#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
// This struct is followed by the full payload of a message to be relayed. |
struct RelayPortsMessageData { |
ports::NodeName destination; |
@@ -117,7 +123,7 @@ Channel::MessagePtr CreateMessage(MessageType type, |
header->padding = 0; |
*out_data = reinterpret_cast<DataType*>(&header[1]); |
return message; |
-}; |
+} |
template <typename DataType> |
void GetMessagePayload(const void* bytes, DataType** out_data) { |
@@ -152,12 +158,24 @@ void NodeChannel::GetPortsMessageData(Channel::Message* message, |
} |
void NodeChannel::Start() { |
+#if defined(OS_MACOSX) && !defined(OS_IOS) |
+ MachPortRelay* relay = delegate_->GetMachPortRelay(); |
+ if (relay) |
+ relay->AddObserver(this); |
+#endif |
+ |
base::AutoLock lock(channel_lock_); |
DCHECK(channel_); |
channel_->Start(); |
} |
void NodeChannel::ShutDown() { |
+#if defined(OS_MACOSX) && !defined(OS_IOS) |
+ MachPortRelay* relay = delegate_->GetMachPortRelay(); |
+ if (relay) |
+ relay->RemoveObserver(this); |
+#endif |
+ |
base::AutoLock lock(channel_lock_); |
if (channel_) { |
channel_->ShutDown(); |
@@ -303,9 +321,10 @@ void NodeChannel::Introduce(const ports::NodeName& name, |
WriteChannelMessage(std::move(message)); |
} |
-#if defined(OS_WIN) |
+#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
void NodeChannel::RelayPortsMessage(const ports::NodeName& destination, |
Channel::MessagePtr message) { |
+#if defined(OS_WIN) |
DCHECK(message->has_handles()); |
// Note that this is only used on Windows, and on Windows all platform |
@@ -325,9 +344,27 @@ void NodeChannel::RelayPortsMessage(const ports::NodeName& destination, |
ScopedPlatformHandleVectorPtr handles = message->TakeHandles(); |
handles->clear(); |
+#else |
+ DCHECK(message->has_mach_ports()); |
+ |
+ // On OSX, the handles are extracted from the relayed message and attached to |
+ // the wrapper. The broker then takes the handles attached to the wrapper and |
+ // moves them back to the relayed message. This is necessary because the |
+ // message may contain fds which need to be attached to the outer message so |
+ // that they can be transferred to the broker. |
+ ScopedPlatformHandleVectorPtr handles = message->TakeHandles(); |
+ size_t num_bytes = sizeof(RelayPortsMessageData) + message->data_num_bytes(); |
+ RelayPortsMessageData* data; |
+ Channel::MessagePtr relay_message = CreateMessage( |
+ MessageType::RELAY_PORTS_MESSAGE, num_bytes, handles->size(), &data); |
+ data->destination = destination; |
+ memcpy(data + 1, message->data(), message->data_num_bytes()); |
+ relay_message->SetHandles(std::move(handles)); |
+#endif // defined(OS_WIN) |
+ |
WriteChannelMessage(std::move(relay_message)); |
} |
-#endif |
+#endif // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
NodeChannel::NodeChannel(Delegate* delegate, |
ScopedPlatformHandle platform_handle, |
@@ -361,7 +398,19 @@ void NodeChannel::OnChannelMessage(const void* payload, |
} |
} |
} |
-#endif |
+#elif defined(OS_MACOSX) && !defined(OS_IOS) |
+ // If we're not the root, receive any mach ports from the message. If we're |
+ // the root, the only message containing mach ports should be a |
+ // RELAY_PORTS_MESSAGE. |
+ { |
+ MachPortRelay* relay = delegate_->GetMachPortRelay(); |
+ if (handles && !relay) { |
+ if (!MachPortRelay::ReceivePorts(handles.get())) { |
+ LOG(ERROR) << "Error receiving mach ports."; |
+ } |
+ } |
+ } |
+#endif // defined(OS_WIN) |
const Header* header = static_cast<const Header*>(payload); |
switch (header->type) { |
@@ -484,7 +533,7 @@ void NodeChannel::OnChannelMessage(const void* payload, |
break; |
} |
-#if defined(OS_WIN) |
+#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
case MessageType::RELAY_PORTS_MESSAGE: { |
base::ProcessHandle from_process; |
{ |
@@ -500,6 +549,24 @@ void NodeChannel::OnChannelMessage(const void* payload, |
DLOG(ERROR) << "Dropping invalid relay message."; |
break; |
} |
+#if defined(OS_MACOSX) && !defined(OS_IOS) |
+ message->SetHandles(std::move(handles)); |
+ MachPortRelay* relay = delegate_->GetMachPortRelay(); |
+ if (!relay) { |
+ LOG(ERROR) << "Receiving mach ports without a port relay from " |
+ << remote_node_name_ << ". Dropping message."; |
+ break; |
+ } |
+ { |
+ base::AutoLock lock(pending_mach_messages_lock_); |
+ if (relay->port_provider()->TaskForPid(from_process) == |
+ MACH_PORT_NULL) { |
+ pending_relay_messages_.push( |
+ std::make_pair(data->destination, std::move(message))); |
+ break; |
+ } |
+ } |
+#endif |
delegate_->OnRelayPortsMessage(remote_node_name_, from_process, |
data->destination, std::move(message)); |
break; |
@@ -526,6 +593,58 @@ void NodeChannel::OnChannelError() { |
delegate_->OnChannelError(node_name); |
} |
+#if defined(OS_MACOSX) && !defined(OS_IOS) |
+void NodeChannel::OnProcessReady(base::ProcessHandle process) { |
+ io_task_runner_->PostTask(FROM_HERE, base::Bind( |
+ &NodeChannel::ProcessPendingMessagesWithMachPorts, this)); |
+} |
+ |
+void NodeChannel::ProcessPendingMessagesWithMachPorts() { |
+ MachPortRelay* relay = delegate_->GetMachPortRelay(); |
+ DCHECK(relay); |
+ |
+ base::ProcessHandle remote_process_handle; |
+ { |
+ base::AutoLock lock(remote_process_handle_lock_); |
+ remote_process_handle = remote_process_handle_; |
+ } |
+ PendingMessageQueue pending_writes; |
+ PendingRelayMessageQueue pending_relays; |
+ { |
+ base::AutoLock lock(pending_mach_messages_lock_); |
+ pending_writes.swap(pending_write_messages_); |
+ pending_relays.swap(pending_relay_messages_); |
+ } |
+ DCHECK(pending_writes.empty() && pending_relays.empty()); |
+ |
+ while (!pending_writes.empty()) { |
+ Channel::MessagePtr message = std::move(pending_writes.front()); |
+ pending_writes.pop(); |
+ if (!relay->SendPortsToProcess(message.get(), remote_process_handle)) { |
+ LOG(ERROR) << "Error on sending mach ports. Remote process is likely " |
+ << "gone. Dropping message."; |
+ return; |
+ } |
+ |
+ base::AutoLock lock(channel_lock_); |
+ if (!channel_) { |
+ DLOG(ERROR) << "Dropping message on closed channel."; |
+ break; |
+ } else { |
+ channel_->Write(std::move(message)); |
+ } |
+ } |
+ |
+ while (!pending_relays.empty()) { |
+ ports::NodeName destination = pending_relays.front().first; |
+ Channel::MessagePtr message = std::move(pending_relays.front().second); |
+ pending_relays.pop(); |
+ delegate_->OnRelayPortsMessage(remote_node_name_, remote_process_handle, |
+ destination, std::move(message)); |
+ } |
+} |
+#endif |
+ |
void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) { |
#if defined(OS_WIN) |
// Map handles to the destination process. Note: only messages from a |
@@ -550,6 +669,39 @@ void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) { |
} |
} |
} |
+#elif defined(OS_MACOSX) && !defined(OS_IOS) |
+ // On OSX, we need to transfer mach ports to the destination process before |
+ // transferring the message itself. |
+ if (message->has_mach_ports()) { |
+ MachPortRelay* relay = delegate_->GetMachPortRelay(); |
+ if (relay) { |
+ base::ProcessHandle remote_process_handle; |
+ { |
+ base::AutoLock lock(remote_process_handle_lock_); |
+ // Expect that the receiving node is a child. |
+ DCHECK(remote_process_handle_ != base::kNullProcessHandle); |
+ remote_process_handle = remote_process_handle_; |
+ } |
+ { |
+ base::AutoLock lock(pending_mach_messages_lock_); |
+ if (relay->port_provider()->TaskForPid(remote_process_handle) == |
+ MACH_PORT_NULL) { |
+ // It is also possible for TaskForPid() to return MACH_PORT_NULL when |
+ // the process has started, then died. In that case, the queued |
+ // message will never be processed. But that's fine since we're about |
+ // to die anyway. |
+ pending_write_messages_.push(std::move(message)); |
+ return; |
+ } |
+ } |
+ |
+ if (!relay->SendPortsToProcess(message.get(), remote_process_handle)) { |
+ LOG(ERROR) << "Error on sending mach ports. Remote process is likely " |
+ << "gone. Dropping message."; |
+ return; |
+ } |
+ } |
+ } |
#endif |
base::AutoLock lock(channel_lock_); |