Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1442)

Side by Side Diff: mojo/edk/system/broker_state.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tsepez review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/broker_state.h ('k') | mojo/edk/system/child_broker.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/broker_state.h" 5 #include "mojo/edk/system/broker_state.h"
6 6
7 #include "base/bind.h"
7 #include "base/rand_util.h" 8 #include "base/rand_util.h"
8 #include "mojo/edk/embedder/embedder_internal.h" 9 #include "mojo/edk/embedder/embedder_internal.h"
9 #include "mojo/edk/embedder/platform_channel_pair.h" 10 #include "mojo/edk/embedder/platform_channel_pair.h"
11 #include "mojo/edk/system/child_broker_host.h"
12 #include "mojo/edk/system/message_pipe_dispatcher.h"
13 #include "mojo/edk/system/routed_raw_channel.h"
10 14
11 namespace mojo { 15 namespace mojo {
12 namespace edk { 16 namespace edk {
13 17
14 BrokerState* BrokerState::GetInstance() { 18 BrokerState* BrokerState::GetInstance() {
15 return base::Singleton< 19 return base::Singleton<
16 BrokerState, base::LeakySingletonTraits<BrokerState>>::get(); 20 BrokerState, base::LeakySingletonTraits<BrokerState>>::get();
17 } 21 }
18 22
19 #if defined(OS_WIN) 23 #if defined(OS_WIN)
20 void BrokerState::CreatePlatformChannelPair( 24 void BrokerState::CreatePlatformChannelPair(
21 ScopedPlatformHandle* server, ScopedPlatformHandle* client) { 25 ScopedPlatformHandle* server, ScopedPlatformHandle* client) {
22 PlatformChannelPair channel_pair; 26 PlatformChannelPair channel_pair;
23 *server = channel_pair.PassServerHandle(); 27 *server = channel_pair.PassServerHandle();
24 *client = channel_pair.PassClientHandle(); 28 *client = channel_pair.PassClientHandle();
25 } 29 }
26 30
27 void BrokerState::HandleToToken( 31 void BrokerState::HandleToToken(
28 const PlatformHandle* platform_handles, 32 const PlatformHandle* platform_handles,
29 size_t count, 33 size_t count,
30 uint64_t* tokens) { 34 uint64_t* tokens) {
31 base::AutoLock auto_locker(lock_); 35 base::AutoLock auto_locker(token_map_lock_);
32 for (size_t i = 0; i < count; ++i) { 36 for (size_t i = 0; i < count; ++i) {
33 if (platform_handles[i].is_valid()) { 37 if (platform_handles[i].is_valid()) {
34 uint64_t token; 38 uint64_t token;
35 do { 39 do {
36 token = base::RandUint64(); 40 token = base::RandUint64();
37 } while (!token || token_map_.find(token) != token_map_.end()); 41 } while (!token || token_map_.find(token) != token_map_.end());
38 tokens[i] = token; 42 tokens[i] = token;
39 token_map_[tokens[i]] = platform_handles[i].handle; 43 token_map_[tokens[i]] = platform_handles[i].handle;
40 } else { 44 } else {
41 DLOG(WARNING) << "BrokerState got invalid handle."; 45 DLOG(WARNING) << "BrokerState got invalid handle.";
42 tokens[i] = 0; 46 tokens[i] = 0;
43 } 47 }
44 } 48 }
45 } 49 }
46 50
47 void BrokerState::TokenToHandle(const uint64_t* tokens, 51 void BrokerState::TokenToHandle(const uint64_t* tokens,
48 size_t count, 52 size_t count,
49 PlatformHandle* handles) { 53 PlatformHandle* handles) {
50 base::AutoLock auto_locker(lock_); 54 base::AutoLock auto_locker(token_map_lock_);
51 for (size_t i = 0; i < count; ++i) { 55 for (size_t i = 0; i < count; ++i) {
52 auto it = token_map_.find(tokens[i]); 56 auto it = token_map_.find(tokens[i]);
53 if (it == token_map_.end()) { 57 if (it == token_map_.end()) {
54 DLOG(WARNING) << "TokenToHandle didn't find token."; 58 DLOG(WARNING) << "TokenToHandle didn't find token.";
55 } else { 59 } else {
56 handles[i].handle = it->second; 60 handles[i].handle = it->second;
57 token_map_.erase(it); 61 token_map_.erase(it);
58 } 62 }
59 } 63 }
60 } 64 }
61 #endif 65 #endif
62 66
63 BrokerState::BrokerState() : broker_thread_("Mojo Broker Thread") { 67 void BrokerState::ConnectMessagePipe(uint64_t pipe_id,
64 base::Thread::Options options(base::MessageLoop::TYPE_IO, 0); 68 MessagePipeDispatcher* message_pipe) {
65 broker_thread_.StartWithOptions(options); 69 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
70 base::AutoLock auto_lock(lock_);
71 if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
72 // Both ends of the message pipe are in this process.
73 if (!in_process_pipes_channel1_) {
74 PlatformChannelPair channel_pair;
75 in_process_pipes_channel1_ = new RoutedRawChannel(
76 channel_pair.PassServerHandle(),
77 base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
78 in_process_pipes_channel2_ = new RoutedRawChannel(
79 channel_pair.PassClientHandle(),
80 base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
81 }
82
83 connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_;
84 connected_pipes_[message_pipe] = in_process_pipes_channel2_;
85 in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]);
86 in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe);
87 pending_connects_[pipe_id]->GotNonTransferableChannel(
88 in_process_pipes_channel1_->channel());
89 message_pipe->GotNonTransferableChannel(
90 in_process_pipes_channel2_->channel());
91
92 pending_connects_.erase(pipe_id);
93 return;
94 }
95
96 if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) {
97 // A child process has already tried to connect.
98 EnsureProcessesConnected(base::GetCurrentProcId(),
99 pending_child_connects_[pipe_id]->GetProcessId());
100 pending_child_connects_[pipe_id]->ConnectMessagePipe(
101 pipe_id, base::GetCurrentProcId());
102 base::ProcessId peer_pid = pending_child_connects_[pipe_id]->GetProcessId();
103 pending_child_connects_.erase(pipe_id);
104 connected_pipes_[message_pipe] = child_channels_[peer_pid];
105 child_channels_[peer_pid]->AddRoute(pipe_id, message_pipe);
106 message_pipe->GotNonTransferableChannel(
107 child_channels_[peer_pid]->channel());
108 return;
109 }
110
111 pending_connects_[pipe_id] = message_pipe;
112 }
113
114 void BrokerState::CloseMessagePipe(uint64_t pipe_id,
115 MessagePipeDispatcher* message_pipe) {
116 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
117 base::AutoLock auto_lock(lock_);
118
119 CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end());
120 connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe);
121 connected_pipes_.erase(message_pipe);
122 }
123
124 void BrokerState::ChildBrokerHostCreated(ChildBrokerHost* child_broker_host) {
125 base::AutoLock auto_lock(lock_);
126 CHECK(child_processes_.find(child_broker_host->GetProcessId()) ==
127 child_processes_.end());
128 child_processes_[child_broker_host->GetProcessId()] = child_broker_host;
129 }
130
131 void BrokerState::ChildBrokerHostDestructed(
132 ChildBrokerHost* child_broker_host) {
133 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
134 base::AutoLock auto_lock(lock_);
135
136 for (auto it = pending_child_connects_.begin();
137 it != pending_child_connects_.end();) {
138 if (it->second == child_broker_host) {
139 // Since we can't do it = pending_child_connects_.erase(it); until
140 // hash_map uses unordered_map on posix.
141 auto cur = it++;
142 pending_child_connects_.erase(cur);
143 } else {
144 it++;
145 }
146 }
147
148 base::ProcessId pid = child_broker_host->GetProcessId();
149 for (auto it = connected_processes_.begin();
150 it != connected_processes_.end();) {
151 if ((*it).first == pid || (*it).second == pid) {
152 // Since we can't do it = pending_child_connects_.erase(it); until
153 // hash_map uses unordered_map on posix.
154 auto cur = it++;
155 connected_processes_.erase(cur);
156 } else {
157 it++;
158 }
159 }
160
161 CHECK(child_processes_.find(pid) != child_processes_.end());
162 child_processes_.erase(pid);
163 }
164
165 void BrokerState::HandleConnectMessagePipe(ChildBrokerHost* pipe_process,
166 uint64_t pipe_id) {
167 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
168 base::AutoLock auto_lock(lock_);
169 if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) {
170 // Another child process is waiting to connect to the given pipe.
171 ChildBrokerHost* pending_pipe_process = pending_child_connects_[pipe_id];
172 EnsureProcessesConnected(pipe_process->GetProcessId(),
173 pending_pipe_process->GetProcessId());
174 pending_pipe_process->ConnectMessagePipe(
175 pipe_id, pipe_process->GetProcessId());
176 pipe_process->ConnectMessagePipe(
177 pipe_id, pending_pipe_process->GetProcessId());
178 pending_child_connects_.erase(pipe_id);
179 return;
180 }
181
182 if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
183 // This parent process is the other side of the given pipe.
184 EnsureProcessesConnected(base::GetCurrentProcId(),
185 pipe_process->GetProcessId());
186 MessagePipeDispatcher* pending_pipe = pending_connects_[pipe_id];
187 connected_pipes_[pending_pipe] =
188 child_channels_[pipe_process->GetProcessId()];
189 child_channels_[pipe_process->GetProcessId()]->AddRoute(
190 pipe_id, pending_pipe);
191 pending_pipe->GotNonTransferableChannel(
192 child_channels_[pipe_process->GetProcessId()]->channel());
193 pipe_process->ConnectMessagePipe(
194 pipe_id, base::GetCurrentProcId());
195 pending_connects_.erase(pipe_id);
196 return;
197 }
198
199 // This is the first connection request for pipe_id to reach the parent.
200 pending_child_connects_[pipe_id] = pipe_process;
201 }
202
203 void BrokerState::HandleCancelConnectMessagePipe(uint64_t pipe_id) {
204 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
205 base::AutoLock auto_lock(lock_);
206 if (pending_child_connects_.find(pipe_id) == pending_child_connects_.end()) {
207 NOTREACHED() << "Can't find entry for pipe_id " << pipe_id;
208 } else {
209 pending_child_connects_.erase(pipe_id);
210 }
211 }
212
213 BrokerState::BrokerState()
214 : in_process_pipes_channel1_(nullptr),
215 in_process_pipes_channel2_(nullptr) {
66 DCHECK(!internal::g_broker); 216 DCHECK(!internal::g_broker);
67 internal::g_broker = this; 217 internal::g_broker = this;
68 } 218 }
69 219
70 BrokerState::~BrokerState() { 220 BrokerState::~BrokerState() {
71 } 221 }
72 222
223 void BrokerState::EnsureProcessesConnected(base::ProcessId pid1,
224 base::ProcessId pid2) {
225 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
226 lock_.AssertAcquired();
227 CHECK_NE(pid1, pid2);
228 CHECK_NE(pid2, base::GetCurrentProcId());
229 std::pair<base::ProcessId, base::ProcessId> processes;
230 processes.first = std::min(pid1, pid2);
231 processes.second = std::max(pid1, pid2);
232 if (connected_processes_.find(processes) != connected_processes_.end())
233 return;
234
235 connected_processes_.insert(processes);
236 PlatformChannelPair channel_pair;
237 if (pid1 == base::GetCurrentProcId()) {
238 CHECK(child_channels_.find(pid2) == child_channels_.end());
239 CHECK(child_processes_.find(pid2) != child_processes_.end());
240 child_channels_[pid2] = new RoutedRawChannel(
241 channel_pair.PassServerHandle(),
242 base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
243 child_processes_[pid2]->ConnectToProcess(base::GetCurrentProcId(),
244 channel_pair.PassClientHandle());
245 return;
246 }
247
248 CHECK(child_processes_.find(pid1) != child_processes_.end());
249 CHECK(child_processes_.find(pid2) != child_processes_.end());
250 child_processes_[pid1]->ConnectToProcess(pid2,
251 channel_pair.PassServerHandle());
252 child_processes_[pid2]->ConnectToProcess(pid1,
253 channel_pair.PassClientHandle());
254 }
255
256 void BrokerState::ChannelDestructed(RoutedRawChannel* channel) {
257 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
258 base::AutoLock auto_lock(lock_);
259 for (auto it : child_channels_) {
260 if (it.second == channel) {
261 child_channels_.erase(it.first);
262 break;
263 }
264 }
265 }
266
73 } // namespace edk 267 } // namespace edk
74 } // namespace mojo 268 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/broker_state.h ('k') | mojo/edk/system/child_broker.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698