Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(915)

Unified Diff: mojo/edk/system/broker_state.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tsepez review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/broker_state.h ('k') | mojo/edk/system/child_broker.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/broker_state.cc
diff --git a/mojo/edk/system/broker_state.cc b/mojo/edk/system/broker_state.cc
index 6eb4ae94e3f61989dbf02f2a8a26c7ae82f137f1..b875b31a24a72e632310de7a9ebfe648ddff71ee 100644
--- a/mojo/edk/system/broker_state.cc
+++ b/mojo/edk/system/broker_state.cc
@@ -4,9 +4,13 @@
#include "mojo/edk/system/broker_state.h"
+#include "base/bind.h"
#include "base/rand_util.h"
#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
+#include "mojo/edk/system/child_broker_host.h"
+#include "mojo/edk/system/message_pipe_dispatcher.h"
+#include "mojo/edk/system/routed_raw_channel.h"
namespace mojo {
namespace edk {
@@ -28,7 +32,7 @@ void BrokerState::HandleToToken(
const PlatformHandle* platform_handles,
size_t count,
uint64_t* tokens) {
- base::AutoLock auto_locker(lock_);
+ base::AutoLock auto_locker(token_map_lock_);
for (size_t i = 0; i < count; ++i) {
if (platform_handles[i].is_valid()) {
uint64_t token;
@@ -47,7 +51,7 @@ void BrokerState::HandleToToken(
void BrokerState::TokenToHandle(const uint64_t* tokens,
size_t count,
PlatformHandle* handles) {
- base::AutoLock auto_locker(lock_);
+ base::AutoLock auto_locker(token_map_lock_);
for (size_t i = 0; i < count; ++i) {
auto it = token_map_.find(tokens[i]);
if (it == token_map_.end()) {
@@ -60,9 +64,155 @@ void BrokerState::TokenToHandle(const uint64_t* tokens,
}
#endif
-BrokerState::BrokerState() : broker_thread_("Mojo Broker Thread") {
- base::Thread::Options options(base::MessageLoop::TYPE_IO, 0);
- broker_thread_.StartWithOptions(options);
+void BrokerState::ConnectMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+ if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
+ // Both ends of the message pipe are in this process.
+ if (!in_process_pipes_channel1_) {
+ PlatformChannelPair channel_pair;
+ in_process_pipes_channel1_ = new RoutedRawChannel(
+ channel_pair.PassServerHandle(),
+ base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
+ in_process_pipes_channel2_ = new RoutedRawChannel(
+ channel_pair.PassClientHandle(),
+ base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
+ }
+
+ connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_;
+ connected_pipes_[message_pipe] = in_process_pipes_channel2_;
+ in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]);
+ in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe);
+ pending_connects_[pipe_id]->GotNonTransferableChannel(
+ in_process_pipes_channel1_->channel());
+ message_pipe->GotNonTransferableChannel(
+ in_process_pipes_channel2_->channel());
+
+ pending_connects_.erase(pipe_id);
+ return;
+ }
+
+ if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) {
+ // A child process has already tried to connect.
+ EnsureProcessesConnected(base::GetCurrentProcId(),
+ pending_child_connects_[pipe_id]->GetProcessId());
+ pending_child_connects_[pipe_id]->ConnectMessagePipe(
+ pipe_id, base::GetCurrentProcId());
+ base::ProcessId peer_pid = pending_child_connects_[pipe_id]->GetProcessId();
+ pending_child_connects_.erase(pipe_id);
+ connected_pipes_[message_pipe] = child_channels_[peer_pid];
+ child_channels_[peer_pid]->AddRoute(pipe_id, message_pipe);
+ message_pipe->GotNonTransferableChannel(
+ child_channels_[peer_pid]->channel());
+ return;
+ }
+
+ pending_connects_[pipe_id] = message_pipe;
+}
+
+void BrokerState::CloseMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+
+ CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end());
+ connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe);
+ connected_pipes_.erase(message_pipe);
+}
+
+void BrokerState::ChildBrokerHostCreated(ChildBrokerHost* child_broker_host) {
+ base::AutoLock auto_lock(lock_);
+ CHECK(child_processes_.find(child_broker_host->GetProcessId()) ==
+ child_processes_.end());
+ child_processes_[child_broker_host->GetProcessId()] = child_broker_host;
+}
+
+void BrokerState::ChildBrokerHostDestructed(
+ ChildBrokerHost* child_broker_host) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+
+ for (auto it = pending_child_connects_.begin();
+ it != pending_child_connects_.end();) {
+ if (it->second == child_broker_host) {
+ // Since we can't do it = pending_child_connects_.erase(it); until
+ // hash_map uses unordered_map on posix.
+ auto cur = it++;
+ pending_child_connects_.erase(cur);
+ } else {
+ it++;
+ }
+ }
+
+ base::ProcessId pid = child_broker_host->GetProcessId();
+ for (auto it = connected_processes_.begin();
+ it != connected_processes_.end();) {
+ if ((*it).first == pid || (*it).second == pid) {
+ // Since we can't do it = pending_child_connects_.erase(it); until
+ // hash_map uses unordered_map on posix.
+ auto cur = it++;
+ connected_processes_.erase(cur);
+ } else {
+ it++;
+ }
+ }
+
+ CHECK(child_processes_.find(pid) != child_processes_.end());
+ child_processes_.erase(pid);
+}
+
+void BrokerState::HandleConnectMessagePipe(ChildBrokerHost* pipe_process,
+ uint64_t pipe_id) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+ if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) {
+ // Another child process is waiting to connect to the given pipe.
+ ChildBrokerHost* pending_pipe_process = pending_child_connects_[pipe_id];
+ EnsureProcessesConnected(pipe_process->GetProcessId(),
+ pending_pipe_process->GetProcessId());
+ pending_pipe_process->ConnectMessagePipe(
+ pipe_id, pipe_process->GetProcessId());
+ pipe_process->ConnectMessagePipe(
+ pipe_id, pending_pipe_process->GetProcessId());
+ pending_child_connects_.erase(pipe_id);
+ return;
+ }
+
+ if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
+ // This parent process is the other side of the given pipe.
+ EnsureProcessesConnected(base::GetCurrentProcId(),
+ pipe_process->GetProcessId());
+ MessagePipeDispatcher* pending_pipe = pending_connects_[pipe_id];
+ connected_pipes_[pending_pipe] =
+ child_channels_[pipe_process->GetProcessId()];
+ child_channels_[pipe_process->GetProcessId()]->AddRoute(
+ pipe_id, pending_pipe);
+ pending_pipe->GotNonTransferableChannel(
+ child_channels_[pipe_process->GetProcessId()]->channel());
+ pipe_process->ConnectMessagePipe(
+ pipe_id, base::GetCurrentProcId());
+ pending_connects_.erase(pipe_id);
+ return;
+ }
+
+ // This is the first connection request for pipe_id to reach the parent.
+ pending_child_connects_[pipe_id] = pipe_process;
+}
+
+void BrokerState::HandleCancelConnectMessagePipe(uint64_t pipe_id) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+ if (pending_child_connects_.find(pipe_id) == pending_child_connects_.end()) {
+ NOTREACHED() << "Can't find entry for pipe_id " << pipe_id;
+ } else {
+ pending_child_connects_.erase(pipe_id);
+ }
+}
+
+BrokerState::BrokerState()
+ : in_process_pipes_channel1_(nullptr),
+ in_process_pipes_channel2_(nullptr) {
DCHECK(!internal::g_broker);
internal::g_broker = this;
}
@@ -70,5 +220,49 @@ BrokerState::BrokerState() : broker_thread_("Mojo Broker Thread") {
BrokerState::~BrokerState() {
}
+void BrokerState::EnsureProcessesConnected(base::ProcessId pid1,
+ base::ProcessId pid2) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.AssertAcquired();
+ CHECK_NE(pid1, pid2);
+ CHECK_NE(pid2, base::GetCurrentProcId());
+ std::pair<base::ProcessId, base::ProcessId> processes;
+ processes.first = std::min(pid1, pid2);
+ processes.second = std::max(pid1, pid2);
+ if (connected_processes_.find(processes) != connected_processes_.end())
+ return;
+
+ connected_processes_.insert(processes);
+ PlatformChannelPair channel_pair;
+ if (pid1 == base::GetCurrentProcId()) {
+ CHECK(child_channels_.find(pid2) == child_channels_.end());
+ CHECK(child_processes_.find(pid2) != child_processes_.end());
+ child_channels_[pid2] = new RoutedRawChannel(
+ channel_pair.PassServerHandle(),
+ base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
+ child_processes_[pid2]->ConnectToProcess(base::GetCurrentProcId(),
+ channel_pair.PassClientHandle());
+ return;
+ }
+
+ CHECK(child_processes_.find(pid1) != child_processes_.end());
+ CHECK(child_processes_.find(pid2) != child_processes_.end());
+ child_processes_[pid1]->ConnectToProcess(pid2,
+ channel_pair.PassServerHandle());
+ child_processes_[pid2]->ConnectToProcess(pid1,
+ channel_pair.PassClientHandle());
+}
+
+void BrokerState::ChannelDestructed(RoutedRawChannel* channel) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+ for (auto it : child_channels_) {
+ if (it.second == channel) {
+ child_channels_.erase(it.first);
+ break;
+ }
+ }
+}
+
} // namespace edk
} // namespace mojo
« no previous file with comments | « mojo/edk/system/broker_state.h ('k') | mojo/edk/system/child_broker.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698