Chromium Code Reviews| Index: mojo/edk/system/broker_state.cc |
| diff --git a/mojo/edk/system/broker_state.cc b/mojo/edk/system/broker_state.cc |
| index 6eb4ae94e3f61989dbf02f2a8a26c7ae82f137f1..888d5fd00ba5482f0df8543e7b6f27cdb7f6bc2f 100644 |
| --- a/mojo/edk/system/broker_state.cc |
| +++ b/mojo/edk/system/broker_state.cc |
| @@ -4,9 +4,13 @@ |
| #include "mojo/edk/system/broker_state.h" |
| +#include "base/bind.h" |
| #include "base/rand_util.h" |
| #include "mojo/edk/embedder/embedder_internal.h" |
| #include "mojo/edk/embedder/platform_channel_pair.h" |
| +#include "mojo/edk/system/child_broker_host.h" |
| +#include "mojo/edk/system/message_pipe_dispatcher.h" |
| +#include "mojo/edk/system/routed_raw_channel.h" |
| namespace mojo { |
| namespace edk { |
| @@ -28,7 +32,7 @@ void BrokerState::HandleToToken( |
| const PlatformHandle* platform_handles, |
| size_t count, |
| uint64_t* tokens) { |
| - base::AutoLock auto_locker(lock_); |
| + base::AutoLock auto_locker(token_map_lock_); |
| for (size_t i = 0; i < count; ++i) { |
| if (platform_handles[i].is_valid()) { |
| uint64_t token; |
| @@ -47,7 +51,7 @@ void BrokerState::HandleToToken( |
| void BrokerState::TokenToHandle(const uint64_t* tokens, |
| size_t count, |
| PlatformHandle* handles) { |
| - base::AutoLock auto_locker(lock_); |
| + base::AutoLock auto_locker(token_map_lock_); |
| for (size_t i = 0; i < count; ++i) { |
| auto it = token_map_.find(tokens[i]); |
| if (it == token_map_.end()) { |
| @@ -60,9 +64,152 @@ void BrokerState::TokenToHandle(const uint64_t* tokens, |
| } |
| #endif |
| -BrokerState::BrokerState() : broker_thread_("Mojo Broker Thread") { |
| - base::Thread::Options options(base::MessageLoop::TYPE_IO, 0); |
| - broker_thread_.StartWithOptions(options); |
| +void BrokerState::ConnectMessagePipe(uint64_t pipe_id, |
| + MessagePipeDispatcher* message_pipe) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + base::AutoLock auto_lock(lock_); |
| + if (pending_connects_.find(pipe_id) != pending_connects_.end()) { |
| + // Both ends of the message pipe are in this process. |
| + if (!in_process_pipes_channel1_) { |
| + PlatformChannelPair channel_pair; |
| + in_process_pipes_channel1_ = new RoutedRawChannel( |
| + channel_pair.PassServerHandle(), |
| + base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this))); |
| + in_process_pipes_channel2_ = new RoutedRawChannel( |
| + channel_pair.PassClientHandle(), |
| + base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this))); |
| + } |
| + |
| + connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_; |
| + connected_pipes_[message_pipe] = in_process_pipes_channel2_; |
| + in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]); |
| + in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe); |
| + pending_connects_[pipe_id]->GotNonTransferableChannel( |
| + in_process_pipes_channel1_->channel()); |
| + message_pipe->GotNonTransferableChannel( |
| + in_process_pipes_channel2_->channel()); |
| + |
| + pending_connects_.erase(pipe_id); |
| + return; |
| + } |
| + |
| + if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) { |
| + // A child process has already tried to connect. |
| + EnsureProcessesConnected(base::GetCurrentProcId(), |
| + pending_child_connects_[pipe_id]->GetProcessId()); |
| + pending_child_connects_[pipe_id]->ConnectMessagePipe( |
| + pipe_id, base::GetCurrentProcId()); |
| + base::ProcessId peer_pid = pending_child_connects_[pipe_id]->GetProcessId(); |
| + pending_child_connects_.erase(pipe_id); |
| + connected_pipes_[message_pipe] = child_channels_[peer_pid]; |
| + child_channels_[peer_pid]->AddRoute(pipe_id, message_pipe); |
| + message_pipe->GotNonTransferableChannel( |
| + child_channels_[peer_pid]->channel()); |
| + return; |
| + } |
| + |
| + pending_connects_[pipe_id] = message_pipe; |
| +} |
| + |
| +void BrokerState::CloseMessagePipe(uint64_t pipe_id, |
| + MessagePipeDispatcher* message_pipe) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + base::AutoLock auto_lock(lock_); |
| + |
| + CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end()); |
|
Tom Sepez
2015/12/04 17:59:43
These checks are essential for safety; I'd hate fo
jam
2015/12/05 00:09:34
BrokerState is global state that is in the browser
|
| + connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe); |
| + connected_pipes_.erase(message_pipe); |
| +} |
| + |
| +void BrokerState::ChildBrokerHostCreated(ChildBrokerHost* child_broker_host) { |
| + base::AutoLock auto_lock(lock_); |
| + CHECK(child_processes_.find(child_broker_host->GetProcessId()) == |
| + child_processes_.end()); |
| + child_processes_[child_broker_host->GetProcessId()] = child_broker_host; |
| +} |
| + |
| +void BrokerState::ChildBrokerHostDestructed( |
| + ChildBrokerHost* child_broker_host) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + base::AutoLock auto_lock(lock_); |
| + |
| + for (auto it = pending_child_connects_.begin(); |
| + it != pending_child_connects_.end();) { |
| + if (it->second == child_broker_host) { |
| + // Since we can't do it = pending_child_connects_.erase(it); until |
| + // hash_map uses unordered_map on posix. |
| + auto cur = it++; |
| + pending_child_connects_.erase(cur); |
| + } else { |
| + it++; |
| + } |
| + } |
| + |
| + base::ProcessId pid = child_broker_host->GetProcessId(); |
| + for (auto it = connected_processes_.begin(); |
| + it != connected_processes_.end();) { |
| + if ((*it).first == pid || (*it).second == pid) { |
| + // Since we can't do it = pending_child_connects_.erase(it); until |
| + // hash_map uses unordered_map on posix. |
| + auto cur = it++; |
| + connected_processes_.erase(cur); |
| + } else { |
| + it++; |
| + } |
| + } |
| + |
| + CHECK(child_processes_.find(pid) != child_processes_.end()); |
| + child_processes_.erase(pid); |
| +} |
| + |
| +void BrokerState::HandleConnectMessagePipe(ChildBrokerHost* pipe_process, |
| + uint64_t pipe_id) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + base::AutoLock auto_lock(lock_); |
| + if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) { |
| + // Another child process is waiting to connect to the given pipe. |
| + ChildBrokerHost* pending_pipe_process = pending_child_connects_[pipe_id]; |
| + EnsureProcessesConnected(pipe_process->GetProcessId(), |
| + pending_pipe_process->GetProcessId()); |
| + pending_pipe_process->ConnectMessagePipe( |
| + pipe_id, pipe_process->GetProcessId()); |
| + pipe_process->ConnectMessagePipe( |
| + pipe_id, pending_pipe_process->GetProcessId()); |
| + pending_child_connects_.erase(pipe_id); |
| + return; |
| + } |
| + |
| + if (pending_connects_.find(pipe_id) != pending_connects_.end()) { |
| + // This parent process is the other side of the given pipe. |
| + EnsureProcessesConnected(base::GetCurrentProcId(), |
| + pipe_process->GetProcessId()); |
| + MessagePipeDispatcher* pending_pipe = pending_connects_[pipe_id]; |
| + connected_pipes_[pending_pipe] = |
| + child_channels_[pipe_process->GetProcessId()]; |
| + child_channels_[pipe_process->GetProcessId()]->AddRoute( |
| + pipe_id, pending_pipe); |
| + pending_pipe->GotNonTransferableChannel( |
| + child_channels_[pipe_process->GetProcessId()]->channel()); |
| + pipe_process->ConnectMessagePipe( |
| + pipe_id, base::GetCurrentProcId()); |
| + pending_connects_.erase(pipe_id); |
| + return; |
| + } |
| + |
| + // This is the first connection request for pipe_id to reach the parent. |
| + pending_child_connects_[pipe_id] = pipe_process; |
| +} |
| + |
| +void BrokerState::HandleCancelConnectMessagePipe(uint64_t pipe_id) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + base::AutoLock auto_lock(lock_); |
| + CHECK(pending_child_connects_.find(pipe_id) != pending_child_connects_.end()); |
| + pending_child_connects_.erase(pipe_id); |
| +} |
| + |
| +BrokerState::BrokerState() |
| + : in_process_pipes_channel1_(nullptr), |
| + in_process_pipes_channel2_(nullptr) { |
| DCHECK(!internal::g_broker); |
| internal::g_broker = this; |
| } |
| @@ -70,5 +217,49 @@ BrokerState::BrokerState() : broker_thread_("Mojo Broker Thread") { |
| BrokerState::~BrokerState() { |
| } |
| +void BrokerState::EnsureProcessesConnected(base::ProcessId pid1, |
| + base::ProcessId pid2) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + lock_.AssertAcquired(); |
| + CHECK_NE(pid1, pid2); |
| + CHECK_NE(pid2, base::GetCurrentProcId()); |
| + std::pair<base::ProcessId, base::ProcessId> processes; |
| + processes.first = std::min(pid1, pid2); |
| + processes.second = std::max(pid1, pid2); |
| + if (connected_processes_.find(processes) != connected_processes_.end()) |
| + return; |
| + |
| + connected_processes_.insert(processes); |
| + PlatformChannelPair channel_pair; |
| + if (pid1 == base::GetCurrentProcId()) { |
| + CHECK(child_channels_.find(pid2) == child_channels_.end()); |
| + CHECK(child_processes_.find(pid2) != child_processes_.end()); |
| + child_channels_[pid2] = new RoutedRawChannel( |
| + channel_pair.PassServerHandle(), |
| + base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this))); |
| + child_processes_[pid2]->ConnectToProcess(base::GetCurrentProcId(), |
| + channel_pair.PassClientHandle()); |
| + return; |
| + } |
| + |
| + CHECK(child_processes_.find(pid1) != child_processes_.end()); |
| + CHECK(child_processes_.find(pid2) != child_processes_.end()); |
| + child_processes_[pid1]->ConnectToProcess(pid2, |
| + channel_pair.PassServerHandle()); |
| + child_processes_[pid2]->ConnectToProcess(pid1, |
| + channel_pair.PassClientHandle()); |
| +} |
| + |
| +void BrokerState::ChannelDestructed(RoutedRawChannel* channel) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + base::AutoLock auto_lock(lock_); |
| + for (auto it : child_channels_) { |
| + if (it.second == channel) { |
| + child_channels_.erase(it.first); |
| + break; |
| + } |
| + } |
| +} |
| + |
| } // namespace edk |
| } // namespace mojo |