Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: mojo/edk/system/child_broker.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tsepez review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/child_broker.h ('k') | mojo/edk/system/child_broker_host.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/child_broker.h" 5 #include "mojo/edk/system/child_broker.h"
6 6
7 #include "base/bind.h"
7 #include "base/logging.h" 8 #include "base/logging.h"
8 #include "mojo/edk/embedder/embedder_internal.h" 9 #include "mojo/edk/embedder/embedder_internal.h"
10 #include "mojo/edk/embedder/platform_channel_pair.h"
9 #include "mojo/edk/system/broker_messages.h" 11 #include "mojo/edk/system/broker_messages.h"
12 #include "mojo/edk/system/message_pipe_dispatcher.h"
13 #include "mojo/edk/system/routed_raw_channel.h"
10 14
11 namespace mojo { 15 namespace mojo {
12 namespace edk { 16 namespace edk {
13 17
14 ChildBroker* ChildBroker::GetInstance() { 18 ChildBroker* ChildBroker::GetInstance() {
15 return base::Singleton< 19 return base::Singleton<
16 ChildBroker, base::LeakySingletonTraits<ChildBroker>>::get(); 20 ChildBroker, base::LeakySingletonTraits<ChildBroker>>::get();
17 } 21 }
18 22
19 void ChildBroker::SetChildBrokerHostHandle(ScopedPlatformHandle handle) { 23 void ChildBroker::SetChildBrokerHostHandle(ScopedPlatformHandle handle) {
20 handle_ = handle.Pass(); 24 ScopedPlatformHandle parent_async_channel_handle;
25 #if defined(OS_POSIX)
26 parent_async_channel_handle = handle.Pass();
27 #else
28 // On Windows we have two pipes to the parent. The first is for the token
29 // exchange for creating and passing handles, since the child needs the
30 // parent's help if it is sandboxed. The second is the same as POSIX, which is
31 // used for multiplexing related messages. So on Windows, we send the second
32 // pipe as the first string over the first one.
33 parent_sync_channel_ = handle.Pass();
34
35 HANDLE parent_handle = INVALID_HANDLE_VALUE;
36 DWORD bytes_read = 0;
37 BOOL rv = ReadFile(parent_sync_channel_.get().handle, &parent_handle,
38 sizeof(parent_handle), &bytes_read, NULL);
39 CHECK(rv);
40 parent_async_channel_handle.reset(PlatformHandle(parent_handle));
41 #endif
42
43 parent_async_channel_ =
44 RawChannel::Create(parent_async_channel_handle.Pass());
45 internal::g_io_thread_task_runner->PostTask(
46 FROM_HERE,
47 base::Bind(&RawChannel::Init, base::Unretained(parent_async_channel_),
48 this));
49
21 lock_.Unlock(); 50 lock_.Unlock();
22 } 51 }
23 52
24 #if defined(OS_WIN) 53 #if defined(OS_WIN)
25 void ChildBroker::CreatePlatformChannelPair( 54 void ChildBroker::CreatePlatformChannelPair(
26 ScopedPlatformHandle* server, ScopedPlatformHandle* client) { 55 ScopedPlatformHandle* server, ScopedPlatformHandle* client) {
27 BrokerMessage message; 56 lock_.Lock();
28 message.size = kBrokerMessageHeaderSize; 57 CreatePlatformChannelPairNoLock(server, client);
29 message.id = CREATE_PLATFORM_CHANNEL_PAIR; 58 lock_.Unlock();
30
31 uint32_t response_size = 2 * sizeof(HANDLE);
32 HANDLE handles[2];
33 if (WriteAndReadResponse(&message, handles, response_size)) {
34 server->reset(PlatformHandle(handles[0]));
35 client->reset(PlatformHandle(handles[1]));
36 }
37 } 59 }
38 60
39 void ChildBroker::HandleToToken(const PlatformHandle* platform_handles, 61 void ChildBroker::HandleToToken(const PlatformHandle* platform_handles,
40 size_t count, 62 size_t count,
41 uint64_t* tokens) { 63 uint64_t* tokens) {
42 uint32_t size = kBrokerMessageHeaderSize + 64 uint32_t size = kBrokerMessageHeaderSize +
43 static_cast<int>(count) * sizeof(HANDLE); 65 static_cast<int>(count) * sizeof(HANDLE);
44 std::vector<char> message_buffer(size); 66 std::vector<char> message_buffer(size);
45 BrokerMessage* message = reinterpret_cast<BrokerMessage*>(&message_buffer[0]); 67 BrokerMessage* message = reinterpret_cast<BrokerMessage*>(&message_buffer[0]);
46 message->size = size; 68 message->size = size;
47 message->id = HANDLE_TO_TOKEN; 69 message->id = HANDLE_TO_TOKEN;
48 for (size_t i = 0; i < count; ++i) 70 for (size_t i = 0; i < count; ++i)
49 message->handles[i] = platform_handles[i].handle; 71 message->handles[i] = platform_handles[i].handle;
50 72
51 uint32_t response_size = static_cast<int>(count) * sizeof(uint64_t); 73 uint32_t response_size = static_cast<int>(count) * sizeof(uint64_t);
74 lock_.Lock();
52 WriteAndReadResponse(message, tokens, response_size); 75 WriteAndReadResponse(message, tokens, response_size);
76 lock_.Unlock();
53 } 77 }
54 78
55 void ChildBroker::TokenToHandle(const uint64_t* tokens, 79 void ChildBroker::TokenToHandle(const uint64_t* tokens,
56 size_t count, 80 size_t count,
57 PlatformHandle* handles) { 81 PlatformHandle* handles) {
58 uint32_t size = kBrokerMessageHeaderSize + 82 uint32_t size = kBrokerMessageHeaderSize +
59 static_cast<int>(count) * sizeof(uint64_t); 83 static_cast<int>(count) * sizeof(uint64_t);
60 std::vector<char> message_buffer(size); 84 std::vector<char> message_buffer(size);
61 BrokerMessage* message = 85 BrokerMessage* message =
62 reinterpret_cast<BrokerMessage*>(&message_buffer[0]); 86 reinterpret_cast<BrokerMessage*>(&message_buffer[0]);
63 message->size = size; 87 message->size = size;
64 message->id = TOKEN_TO_HANDLE; 88 message->id = TOKEN_TO_HANDLE;
65 memcpy(&message->tokens[0], tokens, count * sizeof(uint64_t)); 89 memcpy(&message->tokens[0], tokens, count * sizeof(uint64_t));
66 90
67 std::vector<HANDLE> handles_temp(count); 91 std::vector<HANDLE> handles_temp(count);
68 uint32_t response_size = 92 uint32_t response_size =
69 static_cast<uint32_t>(handles_temp.size()) * sizeof(HANDLE); 93 static_cast<uint32_t>(handles_temp.size()) * sizeof(HANDLE);
94 lock_.Lock();
70 if (WriteAndReadResponse(message, &handles_temp[0], response_size)) { 95 if (WriteAndReadResponse(message, &handles_temp[0], response_size)) {
71 for (uint32_t i = 0; i < count; ++i) 96 for (uint32_t i = 0; i < count; ++i)
72 handles[i].handle = handles_temp[i]; 97 handles[i].handle = handles_temp[i];
98 lock_.Unlock();
73 } 99 }
74 } 100 }
75 #endif 101 #endif
76 102
77 ChildBroker::ChildBroker() { 103 void ChildBroker::ConnectMessagePipe(uint64_t pipe_id,
104 MessagePipeDispatcher* message_pipe) {
105 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
106 lock_.Lock();
107
108 ConnectMessagePipeMessage data;
109 data.pipe_id = pipe_id;
110 if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
111 // Both ends of the message pipe are in the same process.
112 // First, tell the browser side that to remove its bookkeeping for a pending
113 // connect, since it'll never get the other side.
114
115 data.type = CANCEL_CONNECT_MESSAGE_PIPE;
116 scoped_ptr<MessageInTransit> message(new MessageInTransit(
117 MessageInTransit::Type::MESSAGE, sizeof(data), &data));
118 parent_async_channel_->WriteMessage(message.Pass());
119
120 if (!in_process_pipes_channel1_) {
121 ScopedPlatformHandle server_handle, client_handle;
122 #if defined(OS_WIN)
123 CreatePlatformChannelPairNoLock(&server_handle, &client_handle);
124 #else
125 PlatformChannelPair channel_pair;
126 server_handle = channel_pair.PassServerHandle();
127 client_handle = channel_pair.PassClientHandle();
128 #endif
129 in_process_pipes_channel1_ = new RoutedRawChannel(
130 server_handle.Pass(),
131 base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
132 in_process_pipes_channel2_ = new RoutedRawChannel(
133 client_handle.Pass(),
134 base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
135 }
136
137 connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_;
138 connected_pipes_[message_pipe] = in_process_pipes_channel2_;
139 in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]);
140 in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe);
141 pending_connects_[pipe_id]->GotNonTransferableChannel(
142 in_process_pipes_channel1_->channel());
143 message_pipe->GotNonTransferableChannel(
144 in_process_pipes_channel2_->channel());
145
146 pending_connects_.erase(pipe_id);
147 lock_.Unlock();
148 return;
149 }
150
151 data.type = CONNECT_MESSAGE_PIPE;
152 scoped_ptr<MessageInTransit> message(new MessageInTransit(
153 MessageInTransit::Type::MESSAGE, sizeof(data), &data));
154 pending_connects_[pipe_id] = message_pipe;
155 parent_async_channel_->WriteMessage(message.Pass());
156
157 lock_.Unlock();
158 }
159
160 void ChildBroker::CloseMessagePipe(
161 uint64_t pipe_id, MessagePipeDispatcher* message_pipe) {
162 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
163 lock_.Lock();
164 CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end());
165 connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe);
166 connected_pipes_.erase(message_pipe);
167 lock_.Unlock();
168 }
169
170 ChildBroker::ChildBroker()
171 : in_process_pipes_channel1_(nullptr),
172 in_process_pipes_channel2_(nullptr) {
78 DCHECK(!internal::g_broker); 173 DCHECK(!internal::g_broker);
79 internal::g_broker = this; 174 internal::g_broker = this;
80 // Block any threads from calling this until we have a pipe to the parent. 175 // Block any threads from calling this until we have a pipe to the parent.
81 lock_.Lock(); 176 lock_.Lock();
82 } 177 }
83 178
84 ChildBroker::~ChildBroker() { 179 ChildBroker::~ChildBroker() {
85 } 180 }
86 181
182 void ChildBroker::OnReadMessage(
183 const MessageInTransit::View& message_view,
184 ScopedPlatformHandleVectorPtr platform_handles) {
185 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
186 lock_.Lock();
187 MultiplexMessages type =
188 *static_cast<const MultiplexMessages*>(message_view.bytes());
189 if (type == CONNECT_TO_PROCESS) {
190 DCHECK_EQ(platform_handles->size(), 1u);
191 ScopedPlatformHandle handle((*platform_handles.get())[0]);
192 (*platform_handles.get())[0] = PlatformHandle();
193
194 const ConnectToProcessMessage* message =
195 static_cast<const ConnectToProcessMessage*>(message_view.bytes());
196
197 CHECK(channels_.find(message->process_id) == channels_.end());
198 channels_[message->process_id] = new RoutedRawChannel(
199 handle.Pass(),
200 base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
201 } else if (type == PEER_PIPE_CONNECTED) {
202 DCHECK(!platform_handles);
203 const PeerPipeConnectedMessage* message =
204 static_cast<const PeerPipeConnectedMessage*>(message_view.bytes());
205
206 uint64_t pipe_id = message->pipe_id;
207 uint64_t peer_pid = message->process_id;
208
209 CHECK(pending_connects_.find(pipe_id) != pending_connects_.end());
210 MessagePipeDispatcher* pipe = pending_connects_[pipe_id];
211 pending_connects_.erase(pipe_id);
212 if (channels_.find(peer_pid) == channels_.end()) {
213 // We saw the peer process die before we got the reply from the parent.
214 pipe->OnError(ERROR_READ_SHUTDOWN);
215 } else {
216 CHECK(connected_pipes_.find(pipe) == connected_pipes_.end());
217 connected_pipes_[pipe] = channels_[peer_pid];
218 channels_[peer_pid]->AddRoute(pipe_id, pipe);
219 pipe->GotNonTransferableChannel(channels_[peer_pid]->channel());
220 }
221 } else {
222 NOTREACHED();
223 }
224
225 lock_.Unlock();
226 }
227
228 void ChildBroker::OnError(Error error) {
229 // The parent process shut down.
230 }
231
232 void ChildBroker::ChannelDestructed(RoutedRawChannel* channel) {
233 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
234 lock_.Lock();
235 for (auto it : channels_) {
236 if (it.second == channel) {
237 channels_.erase(it.first);
238 break;
239 }
240 }
241 lock_.Unlock();
242 }
243
244 #if defined(OS_WIN)
245
87 bool ChildBroker::WriteAndReadResponse(BrokerMessage* message, 246 bool ChildBroker::WriteAndReadResponse(BrokerMessage* message,
88 void* response, 247 void* response,
89 uint32_t response_size) { 248 uint32_t response_size) {
90 lock_.Lock(); 249 CHECK(parent_sync_channel_.is_valid());
91 CHECK(handle_.is_valid());
92 250
93 bool result = true; 251 bool result = true;
94 #if defined(OS_WIN)
95 DWORD bytes_written = 0; 252 DWORD bytes_written = 0;
96 // This will always write in one chunk per 253 // This will always write in one chunk per
97 // https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150.aspx. 254 // https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150.aspx.
98 BOOL rv = WriteFile(handle_.get().handle, message, message->size, 255 BOOL rv = WriteFile(parent_sync_channel_.get().handle, message, message->size,
99 &bytes_written, NULL); 256 &bytes_written, NULL);
100 if (!rv || bytes_written != message->size) { 257 if (!rv || bytes_written != message->size) {
101 LOG(ERROR) << "Child token serializer couldn't write message."; 258 LOG(ERROR) << "Child token serializer couldn't write message.";
102 result = false; 259 result = false;
103 } else { 260 } else {
104 while (response_size) { 261 while (response_size) {
105 DWORD bytes_read = 0; 262 DWORD bytes_read = 0;
106 rv = ReadFile(handle_.get().handle, response, response_size, &bytes_read, 263 rv = ReadFile(parent_sync_channel_.get().handle, response, response_size,
107 NULL); 264 &bytes_read, NULL);
108 if (!rv) { 265 if (!rv) {
109 LOG(ERROR) << "Child token serializer couldn't read result."; 266 LOG(ERROR) << "Child token serializer couldn't read result.";
110 result = false; 267 result = false;
111 break; 268 break;
112 } 269 }
113 response_size -= bytes_read; 270 response_size -= bytes_read;
114 response = static_cast<char*>(response) + bytes_read; 271 response = static_cast<char*>(response) + bytes_read;
115 } 272 }
116 } 273 }
117 #endif
118
119 lock_.Unlock();
120 274
121 return result; 275 return result;
122 } 276 }
123 277
278 void ChildBroker::CreatePlatformChannelPairNoLock(
279 ScopedPlatformHandle* server, ScopedPlatformHandle* client) {
280 BrokerMessage message;
281 message.size = kBrokerMessageHeaderSize;
282 message.id = CREATE_PLATFORM_CHANNEL_PAIR;
283
284 uint32_t response_size = 2 * sizeof(HANDLE);
285 HANDLE handles[2];
286 if (WriteAndReadResponse(&message, handles, response_size)) {
287 server->reset(PlatformHandle(handles[0]));
288 client->reset(PlatformHandle(handles[1]));
289 }
290 }
291
292 #endif
293
124 } // namespace edk 294 } // namespace edk
125 } // namespace mojo 295 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/child_broker.h ('k') | mojo/edk/system/child_broker_host.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698