Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1194)

Unified Diff: mojo/edk/system/child_broker.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tsepez review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/child_broker.h ('k') | mojo/edk/system/child_broker_host.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/child_broker.cc
diff --git a/mojo/edk/system/child_broker.cc b/mojo/edk/system/child_broker.cc
index 7d7b4c664eea16cb1f2e0fcaf811ffab96a8e912..fbfeb3d649bc7bca87848351858d9b97d2b4491b 100644
--- a/mojo/edk/system/child_broker.cc
+++ b/mojo/edk/system/child_broker.cc
@@ -4,9 +4,13 @@
#include "mojo/edk/system/child_broker.h"
+#include "base/bind.h"
#include "base/logging.h"
#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/system/broker_messages.h"
+#include "mojo/edk/system/message_pipe_dispatcher.h"
+#include "mojo/edk/system/routed_raw_channel.h"
namespace mojo {
namespace edk {
@@ -17,23 +21,41 @@ ChildBroker* ChildBroker::GetInstance() {
}
void ChildBroker::SetChildBrokerHostHandle(ScopedPlatformHandle handle) {
- handle_ = handle.Pass();
+ ScopedPlatformHandle parent_async_channel_handle;
+#if defined(OS_POSIX)
+ parent_async_channel_handle = handle.Pass();
+#else
+ // On Windows we have two pipes to the parent. The first is for the token
+ // exchange for creating and passing handles, since the child needs the
+ // parent's help if it is sandboxed. The second is the same as POSIX, which is
+ // used for multiplexing related messages. So on Windows, we send the second
+ // pipe as the first string over the first one.
+ parent_sync_channel_ = handle.Pass();
+
+ HANDLE parent_handle = INVALID_HANDLE_VALUE;
+ DWORD bytes_read = 0;
+ BOOL rv = ReadFile(parent_sync_channel_.get().handle, &parent_handle,
+ sizeof(parent_handle), &bytes_read, NULL);
+ CHECK(rv);
+ parent_async_channel_handle.reset(PlatformHandle(parent_handle));
+#endif
+
+ parent_async_channel_ =
+ RawChannel::Create(parent_async_channel_handle.Pass());
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(parent_async_channel_),
+ this));
+
lock_.Unlock();
}
#if defined(OS_WIN)
void ChildBroker::CreatePlatformChannelPair(
ScopedPlatformHandle* server, ScopedPlatformHandle* client) {
- BrokerMessage message;
- message.size = kBrokerMessageHeaderSize;
- message.id = CREATE_PLATFORM_CHANNEL_PAIR;
-
- uint32_t response_size = 2 * sizeof(HANDLE);
- HANDLE handles[2];
- if (WriteAndReadResponse(&message, handles, response_size)) {
- server->reset(PlatformHandle(handles[0]));
- client->reset(PlatformHandle(handles[1]));
- }
+ lock_.Lock();
+ CreatePlatformChannelPairNoLock(server, client);
+ lock_.Unlock();
}
void ChildBroker::HandleToToken(const PlatformHandle* platform_handles,
@@ -49,7 +71,9 @@ void ChildBroker::HandleToToken(const PlatformHandle* platform_handles,
message->handles[i] = platform_handles[i].handle;
uint32_t response_size = static_cast<int>(count) * sizeof(uint64_t);
+ lock_.Lock();
WriteAndReadResponse(message, tokens, response_size);
+ lock_.Unlock();
}
void ChildBroker::TokenToHandle(const uint64_t* tokens,
@@ -67,14 +91,85 @@ void ChildBroker::TokenToHandle(const uint64_t* tokens,
std::vector<HANDLE> handles_temp(count);
uint32_t response_size =
static_cast<uint32_t>(handles_temp.size()) * sizeof(HANDLE);
+ lock_.Lock();
if (WriteAndReadResponse(message, &handles_temp[0], response_size)) {
for (uint32_t i = 0; i < count; ++i)
handles[i].handle = handles_temp[i];
+ lock_.Unlock();
}
}
#endif
-ChildBroker::ChildBroker() {
+void ChildBroker::ConnectMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.Lock();
+
+ ConnectMessagePipeMessage data;
+ data.pipe_id = pipe_id;
+ if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
+ // Both ends of the message pipe are in the same process.
+ // First, tell the browser side that to remove its bookkeeping for a pending
+ // connect, since it'll never get the other side.
+
+ data.type = CANCEL_CONNECT_MESSAGE_PIPE;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(data), &data));
+ parent_async_channel_->WriteMessage(message.Pass());
+
+ if (!in_process_pipes_channel1_) {
+ ScopedPlatformHandle server_handle, client_handle;
+#if defined(OS_WIN)
+ CreatePlatformChannelPairNoLock(&server_handle, &client_handle);
+#else
+ PlatformChannelPair channel_pair;
+ server_handle = channel_pair.PassServerHandle();
+ client_handle = channel_pair.PassClientHandle();
+#endif
+ in_process_pipes_channel1_ = new RoutedRawChannel(
+ server_handle.Pass(),
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
+ in_process_pipes_channel2_ = new RoutedRawChannel(
+ client_handle.Pass(),
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
+ }
+
+ connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_;
+ connected_pipes_[message_pipe] = in_process_pipes_channel2_;
+ in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]);
+ in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe);
+ pending_connects_[pipe_id]->GotNonTransferableChannel(
+ in_process_pipes_channel1_->channel());
+ message_pipe->GotNonTransferableChannel(
+ in_process_pipes_channel2_->channel());
+
+ pending_connects_.erase(pipe_id);
+ lock_.Unlock();
+ return;
+ }
+
+ data.type = CONNECT_MESSAGE_PIPE;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(data), &data));
+ pending_connects_[pipe_id] = message_pipe;
+ parent_async_channel_->WriteMessage(message.Pass());
+
+ lock_.Unlock();
+}
+
+void ChildBroker::CloseMessagePipe(
+ uint64_t pipe_id, MessagePipeDispatcher* message_pipe) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.Lock();
+ CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end());
+ connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe);
+ connected_pipes_.erase(message_pipe);
+ lock_.Unlock();
+}
+
+ChildBroker::ChildBroker()
+ : in_process_pipes_channel1_(nullptr),
+ in_process_pipes_channel2_(nullptr) {
DCHECK(!internal::g_broker);
internal::g_broker = this;
// Block any threads from calling this until we have a pipe to the parent.
@@ -84,18 +179,80 @@ ChildBroker::ChildBroker() {
ChildBroker::~ChildBroker() {
}
+void ChildBroker::OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.Lock();
+ MultiplexMessages type =
+ *static_cast<const MultiplexMessages*>(message_view.bytes());
+ if (type == CONNECT_TO_PROCESS) {
+ DCHECK_EQ(platform_handles->size(), 1u);
+ ScopedPlatformHandle handle((*platform_handles.get())[0]);
+ (*platform_handles.get())[0] = PlatformHandle();
+
+ const ConnectToProcessMessage* message =
+ static_cast<const ConnectToProcessMessage*>(message_view.bytes());
+
+ CHECK(channels_.find(message->process_id) == channels_.end());
+ channels_[message->process_id] = new RoutedRawChannel(
+ handle.Pass(),
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
+ } else if (type == PEER_PIPE_CONNECTED) {
+ DCHECK(!platform_handles);
+ const PeerPipeConnectedMessage* message =
+ static_cast<const PeerPipeConnectedMessage*>(message_view.bytes());
+
+ uint64_t pipe_id = message->pipe_id;
+ uint64_t peer_pid = message->process_id;
+
+ CHECK(pending_connects_.find(pipe_id) != pending_connects_.end());
+ MessagePipeDispatcher* pipe = pending_connects_[pipe_id];
+ pending_connects_.erase(pipe_id);
+ if (channels_.find(peer_pid) == channels_.end()) {
+ // We saw the peer process die before we got the reply from the parent.
+ pipe->OnError(ERROR_READ_SHUTDOWN);
+ } else {
+ CHECK(connected_pipes_.find(pipe) == connected_pipes_.end());
+ connected_pipes_[pipe] = channels_[peer_pid];
+ channels_[peer_pid]->AddRoute(pipe_id, pipe);
+ pipe->GotNonTransferableChannel(channels_[peer_pid]->channel());
+ }
+ } else {
+ NOTREACHED();
+ }
+
+ lock_.Unlock();
+}
+
+void ChildBroker::OnError(Error error) {
+ // The parent process shut down.
+}
+
+void ChildBroker::ChannelDestructed(RoutedRawChannel* channel) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.Lock();
+ for (auto it : channels_) {
+ if (it.second == channel) {
+ channels_.erase(it.first);
+ break;
+ }
+ }
+ lock_.Unlock();
+}
+
+#if defined(OS_WIN)
+
bool ChildBroker::WriteAndReadResponse(BrokerMessage* message,
void* response,
uint32_t response_size) {
- lock_.Lock();
- CHECK(handle_.is_valid());
+ CHECK(parent_sync_channel_.is_valid());
bool result = true;
-#if defined(OS_WIN)
DWORD bytes_written = 0;
// This will always write in one chunk per
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150.aspx.
- BOOL rv = WriteFile(handle_.get().handle, message, message->size,
+ BOOL rv = WriteFile(parent_sync_channel_.get().handle, message, message->size,
&bytes_written, NULL);
if (!rv || bytes_written != message->size) {
LOG(ERROR) << "Child token serializer couldn't write message.";
@@ -103,8 +260,8 @@ bool ChildBroker::WriteAndReadResponse(BrokerMessage* message,
} else {
while (response_size) {
DWORD bytes_read = 0;
- rv = ReadFile(handle_.get().handle, response, response_size, &bytes_read,
- NULL);
+ rv = ReadFile(parent_sync_channel_.get().handle, response, response_size,
+ &bytes_read, NULL);
if (!rv) {
LOG(ERROR) << "Child token serializer couldn't read result.";
result = false;
@@ -114,12 +271,25 @@ bool ChildBroker::WriteAndReadResponse(BrokerMessage* message,
response = static_cast<char*>(response) + bytes_read;
}
}
-#endif
-
- lock_.Unlock();
return result;
}
+void ChildBroker::CreatePlatformChannelPairNoLock(
+ ScopedPlatformHandle* server, ScopedPlatformHandle* client) {
+ BrokerMessage message;
+ message.size = kBrokerMessageHeaderSize;
+ message.id = CREATE_PLATFORM_CHANNEL_PAIR;
+
+ uint32_t response_size = 2 * sizeof(HANDLE);
+ HANDLE handles[2];
+ if (WriteAndReadResponse(&message, handles, response_size)) {
+ server->reset(PlatformHandle(handles[0]));
+ client->reset(PlatformHandle(handles[1]));
+ }
+}
+
+#endif
+
} // namespace edk
} // namespace mojo
« no previous file with comments | « mojo/edk/system/child_broker.h ('k') | mojo/edk/system/child_broker_host.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698