Index: mojo/edk/system/child_broker.cc |
diff --git a/mojo/edk/system/child_broker.cc b/mojo/edk/system/child_broker.cc |
index 7d7b4c664eea16cb1f2e0fcaf811ffab96a8e912..32e4c4703d957ec4f12cc3a11d2a31e01dad6dab 100644 |
--- a/mojo/edk/system/child_broker.cc |
+++ b/mojo/edk/system/child_broker.cc |
@@ -4,9 +4,13 @@ |
#include "mojo/edk/system/child_broker.h" |
+#include "base/bind.h" |
#include "base/logging.h" |
#include "mojo/edk/embedder/embedder_internal.h" |
+#include "mojo/edk/embedder/platform_channel_pair.h" |
#include "mojo/edk/system/broker_messages.h" |
+#include "mojo/edk/system/message_pipe_dispatcher.h" |
+#include "mojo/edk/system/routed_raw_channel.h" |
namespace mojo { |
namespace edk { |
@@ -17,23 +21,41 @@ ChildBroker* ChildBroker::GetInstance() { |
} |
void ChildBroker::SetChildBrokerHostHandle(ScopedPlatformHandle handle) { |
- handle_ = handle.Pass(); |
+ ScopedPlatformHandle parent_async_channel_handle; |
+#if defined(OS_POSIX) |
+ parent_async_channel_handle = handle.Pass(); |
+#else |
+ // On Windows we have two pipes to the parent. The first is for the token |
+ // exchange for creating and passing handles, since the child needs the |
+ // parent's help if it is sandboxed. The second is the same as POSIX, which is |
+ // used for multiplexing related messages. So on Windows, we send the second |
+ // pipe as the first string over the first one. |
+ parent_sync_channel_ = handle.Pass(); |
+ |
+ HANDLE parent_handle = INVALID_HANDLE_VALUE; |
+ DWORD bytes_read = 0; |
+ BOOL rv = ReadFile(parent_sync_channel_.get().handle, &parent_handle, |
+ sizeof(parent_handle), &bytes_read, NULL); |
+ DCHECK(rv); |
Tom Sepez
2015/12/04 17:59:43
Do we want something stronger than a DCHECK here?
jam
2015/12/05 00:09:34
sure, changed it to CHECK so that we know if this
|
+ parent_async_channel_handle.reset(PlatformHandle(parent_handle)); |
+#endif |
+ |
+ parent_async_channel_ = |
+ RawChannel::Create(parent_async_channel_handle.Pass()); |
+ internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannel::Init, base::Unretained(parent_async_channel_), |
+ this)); |
+ |
lock_.Unlock(); |
} |
#if defined(OS_WIN) |
void ChildBroker::CreatePlatformChannelPair( |
ScopedPlatformHandle* server, ScopedPlatformHandle* client) { |
- BrokerMessage message; |
- message.size = kBrokerMessageHeaderSize; |
- message.id = CREATE_PLATFORM_CHANNEL_PAIR; |
- |
- uint32_t response_size = 2 * sizeof(HANDLE); |
- HANDLE handles[2]; |
- if (WriteAndReadResponse(&message, handles, response_size)) { |
- server->reset(PlatformHandle(handles[0])); |
- client->reset(PlatformHandle(handles[1])); |
- } |
+ lock_.Lock(); |
+ CreatePlatformChannelPairNoLock(server, client); |
+ lock_.Unlock(); |
} |
void ChildBroker::HandleToToken(const PlatformHandle* platform_handles, |
@@ -49,7 +71,9 @@ void ChildBroker::HandleToToken(const PlatformHandle* platform_handles, |
message->handles[i] = platform_handles[i].handle; |
uint32_t response_size = static_cast<int>(count) * sizeof(uint64_t); |
+ lock_.Lock(); |
WriteAndReadResponse(message, tokens, response_size); |
+ lock_.Unlock(); |
} |
void ChildBroker::TokenToHandle(const uint64_t* tokens, |
@@ -67,14 +91,85 @@ void ChildBroker::TokenToHandle(const uint64_t* tokens, |
std::vector<HANDLE> handles_temp(count); |
uint32_t response_size = |
static_cast<uint32_t>(handles_temp.size()) * sizeof(HANDLE); |
+ lock_.Lock(); |
if (WriteAndReadResponse(message, &handles_temp[0], response_size)) { |
for (uint32_t i = 0; i < count; ++i) |
handles[i].handle = handles_temp[i]; |
+ lock_.Unlock(); |
} |
} |
#endif |
-ChildBroker::ChildBroker() { |
+void ChildBroker::ConnectMessagePipe(uint64_t pipe_id, |
+ MessagePipeDispatcher* message_pipe) { |
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
+ lock_.Lock(); |
+ |
+ ConnectMessagePipeMessage data; |
+ data.pipe_id = pipe_id; |
+ if (pending_connects_.find(pipe_id) != pending_connects_.end()) { |
+ // Both ends of the message pipe are in the same process. |
+ // First, tell the browser side that to remove its bookkeeping for a pending |
+ // connect, since it'll never get the other side. |
+ |
+ data.type = CANCEL_CONNECT_MESSAGE_PIPE; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, sizeof(data), &data)); |
+ parent_async_channel_->WriteMessage(message.Pass()); |
+ |
+ if (!in_process_pipes_channel1_) { |
+ ScopedPlatformHandle server_handle, client_handle; |
+#if defined(OS_WIN) |
+ CreatePlatformChannelPairNoLock(&server_handle, &client_handle); |
+#else |
+ PlatformChannelPair channel_pair; |
+ server_handle = channel_pair.PassServerHandle(); |
+ client_handle = channel_pair.PassClientHandle(); |
+#endif |
+ in_process_pipes_channel1_ = new RoutedRawChannel( |
+ server_handle.Pass(), |
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this))); |
+ in_process_pipes_channel2_ = new RoutedRawChannel( |
+ client_handle.Pass(), |
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this))); |
+ } |
+ |
+ connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_; |
+ connected_pipes_[message_pipe] = in_process_pipes_channel2_; |
+ in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]); |
+ in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe); |
+ pending_connects_[pipe_id]->GotNonTransferableChannel( |
+ in_process_pipes_channel1_->channel()); |
+ message_pipe->GotNonTransferableChannel( |
+ in_process_pipes_channel2_->channel()); |
+ |
+ pending_connects_.erase(pipe_id); |
+ lock_.Unlock(); |
+ return; |
+ } |
+ |
+ data.type = CONNECT_MESSAGE_PIPE; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, sizeof(data), &data)); |
+ pending_connects_[pipe_id] = message_pipe; |
+ parent_async_channel_->WriteMessage(message.Pass()); |
+ |
+ lock_.Unlock(); |
+} |
+ |
+void ChildBroker::CloseMessagePipe( |
+ uint64_t pipe_id, MessagePipeDispatcher* message_pipe) { |
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
+ lock_.Lock(); |
+ CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end()); |
+ connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe); |
+ connected_pipes_.erase(message_pipe); |
+ lock_.Unlock(); |
+} |
+ |
+ChildBroker::ChildBroker() |
+ : in_process_pipes_channel1_(nullptr), |
+ in_process_pipes_channel2_(nullptr) { |
DCHECK(!internal::g_broker); |
internal::g_broker = this; |
// Block any threads from calling this until we have a pipe to the parent. |
@@ -84,18 +179,76 @@ ChildBroker::ChildBroker() { |
ChildBroker::~ChildBroker() { |
} |
+void ChildBroker::OnReadMessage( |
+ const MessageInTransit::View& message_view, |
+ ScopedPlatformHandleVectorPtr platform_handles) { |
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
+ lock_.Lock(); |
+ MultiplexMessages type = |
+ *static_cast<const MultiplexMessages*>(message_view.bytes()); |
+ if (type == CONNECT_TO_PROCESS) { |
+ DCHECK_EQ(platform_handles->size(), 1u); |
+ ScopedPlatformHandle handle((*platform_handles.get())[0]); |
+ (*platform_handles.get())[0] = PlatformHandle(); |
+ |
+ const ConnectToProcessMessage* message = |
+ static_cast<const ConnectToProcessMessage*>(message_view.bytes()); |
+ |
+ CHECK(channels_.find(message->process_id) == channels_.end()); |
+ channels_[message->process_id] = new RoutedRawChannel( |
+ handle.Pass(), |
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this))); |
+ } else if (type == PEER_PIPE_CONNECTED) { |
+ DCHECK(!platform_handles); |
+ const PeerPipeConnectedMessage* message = |
+ static_cast<const PeerPipeConnectedMessage*>(message_view.bytes()); |
+ |
+ uint64_t pipe_id = message->pipe_id; |
+ uint64_t peer_pid = message->process_id; |
+ |
+ CHECK(channels_.find(peer_pid) != channels_.end()); |
+ CHECK(pending_connects_.find(pipe_id) != pending_connects_.end()); |
+ MessagePipeDispatcher* pipe = pending_connects_[pipe_id]; |
+ pending_connects_.erase(pipe_id); |
+ CHECK(connected_pipes_.find(pipe) == connected_pipes_.end()); |
+ connected_pipes_[pipe] = channels_[peer_pid]; |
+ channels_[peer_pid]->AddRoute(pipe_id, pipe); |
+ pipe->GotNonTransferableChannel(channels_[peer_pid]->channel()); |
+ } else { |
+ NOTREACHED(); |
+ } |
+ |
+ lock_.Unlock(); |
+} |
+ |
+void ChildBroker::OnError(Error error) { |
+ // The parent process shut down. |
+} |
+ |
+void ChildBroker::ChannelDestructed(RoutedRawChannel* channel) { |
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
+ lock_.Lock(); |
+ for (auto it : channels_) { |
+ if (it.second == channel) { |
+ channels_.erase(it.first); |
+ break; |
+ } |
+ } |
+ lock_.Unlock(); |
+} |
+ |
+#if defined(OS_WIN) |
+ |
bool ChildBroker::WriteAndReadResponse(BrokerMessage* message, |
void* response, |
uint32_t response_size) { |
- lock_.Lock(); |
- CHECK(handle_.is_valid()); |
+ CHECK(parent_sync_channel_.is_valid()); |
bool result = true; |
-#if defined(OS_WIN) |
DWORD bytes_written = 0; |
// This will always write in one chunk per |
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150.aspx. |
- BOOL rv = WriteFile(handle_.get().handle, message, message->size, |
+ BOOL rv = WriteFile(parent_sync_channel_.get().handle, message, message->size, |
&bytes_written, NULL); |
if (!rv || bytes_written != message->size) { |
LOG(ERROR) << "Child token serializer couldn't write message."; |
@@ -103,8 +256,8 @@ bool ChildBroker::WriteAndReadResponse(BrokerMessage* message, |
} else { |
while (response_size) { |
DWORD bytes_read = 0; |
- rv = ReadFile(handle_.get().handle, response, response_size, &bytes_read, |
- NULL); |
+ rv = ReadFile(parent_sync_channel_.get().handle, response, response_size, |
+ &bytes_read, NULL); |
if (!rv) { |
LOG(ERROR) << "Child token serializer couldn't read result."; |
result = false; |
@@ -114,12 +267,25 @@ bool ChildBroker::WriteAndReadResponse(BrokerMessage* message, |
response = static_cast<char*>(response) + bytes_read; |
} |
} |
-#endif |
- |
- lock_.Unlock(); |
return result; |
} |
+void ChildBroker::CreatePlatformChannelPairNoLock( |
+ ScopedPlatformHandle* server, ScopedPlatformHandle* client) { |
+ BrokerMessage message; |
+ message.size = kBrokerMessageHeaderSize; |
+ message.id = CREATE_PLATFORM_CHANNEL_PAIR; |
+ |
+ uint32_t response_size = 2 * sizeof(HANDLE); |
+ HANDLE handles[2]; |
+ if (WriteAndReadResponse(&message, handles, response_size)) { |
+ server->reset(PlatformHandle(handles[0])); |
+ client->reset(PlatformHandle(handles[1])); |
+ } |
+} |
+ |
+#endif |
+ |
} // namespace edk |
} // namespace mojo |