| Index: mojo/edk/system/broker_state.cc
|
| diff --git a/mojo/edk/system/broker_state.cc b/mojo/edk/system/broker_state.cc
|
| index 6eb4ae94e3f61989dbf02f2a8a26c7ae82f137f1..8bbeaa7532ceaa5ac182bfaf83c2b1145e4d726c 100644
|
| --- a/mojo/edk/system/broker_state.cc
|
| +++ b/mojo/edk/system/broker_state.cc
|
| @@ -4,9 +4,13 @@
|
|
|
| #include "mojo/edk/system/broker_state.h"
|
|
|
| +#include "base/bind.h"
|
| #include "base/rand_util.h"
|
| #include "mojo/edk/embedder/embedder_internal.h"
|
| #include "mojo/edk/embedder/platform_channel_pair.h"
|
| +#include "mojo/edk/system/child_broker_host.h"
|
| +#include "mojo/edk/system/message_pipe_dispatcher.h"
|
| +#include "mojo/edk/system/routed_raw_channel.h"
|
|
|
| namespace mojo {
|
| namespace edk {
|
| @@ -28,7 +32,7 @@ void BrokerState::HandleToToken(
|
| const PlatformHandle* platform_handles,
|
| size_t count,
|
| uint64_t* tokens) {
|
| - base::AutoLock auto_locker(lock_);
|
| + base::AutoLock auto_locker(token_map_lock_);
|
| for (size_t i = 0; i < count; ++i) {
|
| if (platform_handles[i].is_valid()) {
|
| uint64_t token;
|
| @@ -47,7 +51,7 @@ void BrokerState::HandleToToken(
|
| void BrokerState::TokenToHandle(const uint64_t* tokens,
|
| size_t count,
|
| PlatformHandle* handles) {
|
| - base::AutoLock auto_locker(lock_);
|
| + base::AutoLock auto_locker(token_map_lock_);
|
| for (size_t i = 0; i < count; ++i) {
|
| auto it = token_map_.find(tokens[i]);
|
| if (it == token_map_.end()) {
|
| @@ -60,9 +64,149 @@ void BrokerState::TokenToHandle(const uint64_t* tokens,
|
| }
|
| #endif
|
|
|
| -BrokerState::BrokerState() : broker_thread_("Mojo Broker Thread") {
|
| - base::Thread::Options options(base::MessageLoop::TYPE_IO, 0);
|
| - broker_thread_.StartWithOptions(options);
|
| +void BrokerState::ConnectMessagePipe(uint64_t pipe_id,
|
| + MessagePipeDispatcher* message_pipe) {
|
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| + base::AutoLock auto_lock(lock_);
|
| + if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
|
| + // Both ends of the message pipe are in this process.
|
| + if (!in_process_pipes_channel1_) {
|
| + PlatformChannelPair channel_pair;
|
| + in_process_pipes_channel1_ = new RoutedRawChannel(
|
| + channel_pair.PassServerHandle(),
|
| + base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
|
| + in_process_pipes_channel2_ = new RoutedRawChannel(
|
| + channel_pair.PassClientHandle(),
|
| + base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
|
| + }
|
| +
|
| + connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_;
|
| + connected_pipes_[message_pipe] = in_process_pipes_channel2_;
|
| + in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]);
|
| + in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe);
|
| + pending_connects_[pipe_id]->GotNonTransferableChannel(
|
| + in_process_pipes_channel1_->channel());
|
| + message_pipe->GotNonTransferableChannel(
|
| + in_process_pipes_channel2_->channel());
|
| +
|
| + pending_connects_.erase(pipe_id);
|
| + return;
|
| + }
|
| +
|
| + if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) {
|
| + // A child process has already tried to connect.
|
| + EnsureProcessesConnected(base::GetCurrentProcId(),
|
| + pending_child_connects_[pipe_id]->GetProcessId());
|
| + pending_child_connects_[pipe_id]->ConnectMessagePipe(
|
| + pipe_id, base::GetCurrentProcId());
|
| + base::ProcessId peer_pid = pending_child_connects_[pipe_id]->GetProcessId();
|
| + pending_child_connects_.erase(pipe_id);
|
| + connected_pipes_[message_pipe] = child_channels_[peer_pid];
|
| + child_channels_[peer_pid]->AddRoute(pipe_id, message_pipe);
|
| + message_pipe->GotNonTransferableChannel(
|
| + child_channels_[peer_pid]->channel());
|
| + return;
|
| + }
|
| +
|
| + pending_connects_[pipe_id] = message_pipe;
|
| +}
|
| +
|
| +void BrokerState::CloseMessagePipe(uint64_t pipe_id,
|
| + MessagePipeDispatcher* message_pipe) {
|
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| + base::AutoLock auto_lock(lock_);
|
| +
|
| + CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end());
|
| + connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe);
|
| + connected_pipes_.erase(message_pipe);
|
| +}
|
| +
|
| +void BrokerState::ChildBrokerHostCreated(ChildBrokerHost* child_broker_host) {
|
| + base::AutoLock auto_lock(lock_);
|
| + CHECK(child_processes_.find(child_broker_host->GetProcessId()) ==
|
| + child_processes_.end());
|
| + child_processes_[child_broker_host->GetProcessId()] = child_broker_host;
|
| +}
|
| +
|
| +void BrokerState::ChildBrokerHostDestructed(
|
| + ChildBrokerHost* child_broker_host) {
|
| + base::AutoLock auto_lock(lock_);
|
| +
|
| + for (auto it = pending_child_connects_.begin();
|
| + it != pending_child_connects_.end();) {
|
| + if (it->second == child_broker_host) {
|
| + // Since we can't do it = pending_child_connects_.erase(it); until
|
| + // hash_map uses unordered_map on posix.
|
| + auto cur = it++;
|
| + pending_child_connects_.erase(cur);
|
| + } else {
|
| + it++;
|
| + }
|
| + }
|
| +
|
| + base::ProcessId pid = child_broker_host->GetProcessId();
|
| + for (auto it = connected_processes_.begin();
|
| + it != connected_processes_.end();) {
|
| + if ((*it).first == pid || (*it).second == pid) {
|
| + // Since we can't do it = pending_child_connects_.erase(it); until
|
| + // hash_map uses unordered_map on posix.
|
| + auto cur = it++;
|
| + connected_processes_.erase(cur);
|
| + } else {
|
| + it++;
|
| + }
|
| + }
|
| +
|
| + CHECK(child_processes_.find(pid) != child_processes_.end());
|
| + child_processes_.erase(pid);
|
| +}
|
| +
|
| +void BrokerState::HandleConnectMessagePipe(ChildBrokerHost* pipe_process,
|
| + uint64_t pipe_id) {
|
| + base::AutoLock auto_lock(lock_);
|
| + if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) {
|
| + // Another child process is waiting to connect to the given pipe.
|
| + ChildBrokerHost* pending_pipe_process = pending_child_connects_[pipe_id];
|
| + EnsureProcessesConnected(pipe_process->GetProcessId(),
|
| + pending_pipe_process->GetProcessId());
|
| + pending_pipe_process->ConnectMessagePipe(
|
| + pipe_id, pipe_process->GetProcessId());
|
| + pipe_process->ConnectMessagePipe(
|
| + pipe_id, pending_pipe_process->GetProcessId());
|
| + pending_child_connects_.erase(pipe_id);
|
| + return;
|
| + }
|
| +
|
| + if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
|
| + // This parent process is the other side of the given pipe.
|
| + EnsureProcessesConnected(base::GetCurrentProcId(),
|
| + pipe_process->GetProcessId());
|
| + MessagePipeDispatcher* pending_pipe = pending_connects_[pipe_id];
|
| + connected_pipes_[pending_pipe] =
|
| + child_channels_[pipe_process->GetProcessId()];
|
| + child_channels_[pipe_process->GetProcessId()]->AddRoute(
|
| + pipe_id, pending_pipe);
|
| + pending_pipe->GotNonTransferableChannel(
|
| + child_channels_[pipe_process->GetProcessId()]->channel());
|
| + pipe_process->ConnectMessagePipe(
|
| + pipe_id, base::GetCurrentProcId());
|
| + pending_connects_.erase(pipe_id);
|
| + return;
|
| + }
|
| +
|
| + // This is the first connection request for pipe_id to reach the parent.
|
| + pending_child_connects_[pipe_id] = pipe_process;
|
| +}
|
| +
|
| +void BrokerState::HandleCancelConnectMessagePipe(uint64_t pipe_id) {
|
| + base::AutoLock auto_lock(lock_);
|
| + CHECK(pending_child_connects_.find(pipe_id) != pending_child_connects_.end());
|
| + pending_child_connects_.erase(pipe_id);
|
| +}
|
| +
|
| +BrokerState::BrokerState()
|
| + : in_process_pipes_channel1_(nullptr),
|
| + in_process_pipes_channel2_(nullptr) {
|
| DCHECK(!internal::g_broker);
|
| internal::g_broker = this;
|
| }
|
| @@ -70,5 +214,47 @@ BrokerState::BrokerState() : broker_thread_("Mojo Broker Thread") {
|
| BrokerState::~BrokerState() {
|
| }
|
|
|
| +void BrokerState::EnsureProcessesConnected(base::ProcessId pid1,
|
| + base::ProcessId pid2) {
|
| + lock_.AssertAcquired();
|
| + CHECK_NE(pid1, pid2);
|
| + CHECK_NE(pid2, base::GetCurrentProcId());
|
| + std::pair<base::ProcessId, base::ProcessId> processes;
|
| + processes.first = std::min(pid1, pid2);
|
| + processes.second = std::max(pid1, pid2);
|
| + if (connected_processes_.find(processes) != connected_processes_.end())
|
| + return;
|
| +
|
| + connected_processes_.insert(processes);
|
| + PlatformChannelPair channel_pair;
|
| + if (pid1 == base::GetCurrentProcId()) {
|
| + CHECK(child_channels_.find(pid2) == child_channels_.end());
|
| + CHECK(child_processes_.find(pid2) != child_processes_.end());
|
| + child_channels_[pid2] = new RoutedRawChannel(
|
| + channel_pair.PassServerHandle(),
|
| + base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
|
| + child_processes_[pid2]->ConnectToProcess(base::GetCurrentProcId(),
|
| + channel_pair.PassClientHandle());
|
| + return;
|
| + }
|
| +
|
| + CHECK(child_processes_.find(pid1) != child_processes_.end());
|
| + CHECK(child_processes_.find(pid2) != child_processes_.end());
|
| + child_processes_[pid1]->ConnectToProcess(pid2,
|
| + channel_pair.PassServerHandle());
|
| + child_processes_[pid2]->ConnectToProcess(pid1,
|
| + channel_pair.PassClientHandle());
|
| +}
|
| +
|
| +void BrokerState::ChannelDestructed(RoutedRawChannel* channel) {
|
| + base::AutoLock auto_lock(lock_);
|
| + for (auto it : child_channels_) {
|
| + if (it.second == channel) {
|
| + child_channels_.erase(it.first);
|
| + break;
|
| + }
|
| + }
|
| +}
|
| +
|
| } // namespace edk
|
| } // namespace mojo
|
|
|