Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Side by Side Diff: mojo/edk/system/child_broker_host.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tsepez review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/child_broker_host.h ('k') | mojo/edk/system/core.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/child_broker_host.h" 5 #include "mojo/edk/system/child_broker_host.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/lazy_instance.h" 8 #include "base/lazy_instance.h"
9 #include "mojo/edk/embedder/embedder_internal.h"
9 #include "mojo/edk/embedder/platform_channel_pair.h" 10 #include "mojo/edk/embedder/platform_channel_pair.h"
10 #include "mojo/edk/system/broker_messages.h" 11 #include "mojo/edk/system/broker_messages.h"
11 #include "mojo/edk/system/broker_state.h" 12 #include "mojo/edk/system/broker_state.h"
12 #include "mojo/edk/system/configuration.h" 13 #include "mojo/edk/system/configuration.h"
14 #include "mojo/edk/system/core.h"
15 #include "mojo/edk/system/platform_handle_dispatcher.h"
13 16
14 namespace mojo { 17 namespace mojo {
15 namespace edk { 18 namespace edk {
16 19
17 namespace { 20 namespace {
21 #if defined(OS_WIN)
18 static const int kDefaultReadBufferSize = 256; 22 static const int kDefaultReadBufferSize = 256;
23 #endif
19 } 24 }
20 25
21 ChildBrokerHost::ChildBrokerHost(base::ProcessHandle child_process, 26 ChildBrokerHost::ChildBrokerHost(base::ProcessHandle child_process,
22 ScopedPlatformHandle pipe) 27 ScopedPlatformHandle pipe)
23 : child_process_(child_process), 28 : process_id_(base::GetProcId(child_process)) {
24 pipe_(pipe.Pass()), 29 ScopedPlatformHandle parent_async_channel_handle;
25 num_bytes_read_(0) { 30 #if defined(OS_POSIX)
26 #if defined(OS_WIN) 31 parent_async_channel_handle = pipe.Pass();
32 #else
33 child_process_ = child_process;
34 sync_channel_ = pipe.Pass();
27 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); 35 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
28 read_context_.handler = this; 36 read_context_.handler = this;
29 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); 37 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
30 write_context_.handler = this; 38 write_context_.handler = this;
31 #else 39 read_data_.resize(kDefaultReadBufferSize);
32 // TODO(jam) 40 num_bytes_read_ = 0;
33 (void)child_process_; // Suppress -Wunused-private-field. 41
34 (void)num_bytes_read_; // Suppress -Wunused-private-field. 42 // See comment in ChildBroker::SetChildBrokerHostHandle. Summary is we need
43 // two pipes on Windows, so send the second one over the first one.
44 PlatformChannelPair parent_pipe;
45 parent_async_channel_handle = parent_pipe.PassServerHandle();
46
47 HANDLE duplicated_child_handle =
48 DuplicateToChild(parent_pipe.PassClientHandle().release().handle);
49 BOOL rv = WriteFile(sync_channel_.get().handle,
50 &duplicated_child_handle, sizeof(duplicated_child_handle),
51 NULL, &write_context_.overlapped);
52 DCHECK(rv || GetLastError() == ERROR_IO_PENDING);
53
54 internal::g_io_thread_task_runner->PostTask(
55 FROM_HERE,
56 base::Bind(&ChildBrokerHost::RegisterIOHandler, base::Unretained(this)));
35 #endif 57 #endif
36 58
37 read_data_.resize(kDefaultReadBufferSize); 59 child_channel_ = RawChannel::Create(parent_async_channel_handle.Pass());
38 BrokerState::GetInstance()->broker_thread()->PostTask( 60 internal::g_io_thread_task_runner->PostTask(
39 FROM_HERE, 61 FROM_HERE,
40 base::Bind(&ChildBrokerHost::RegisterIOHandler, base::Unretained(this))); 62 base::Bind(&RawChannel::Init, base::Unretained(child_channel_), this));
63 internal::g_io_thread_task_runner->PostTask(
64 FROM_HERE,
65 base::Bind(&RawChannel::EnsureLazyInitialized,
66 base::Unretained(child_channel_)));
67
68 BrokerState::GetInstance()->ChildBrokerHostCreated(this);
69 }
70
71 base::ProcessId ChildBrokerHost::GetProcessId() {
72 return process_id_;
73 }
74
75 void ChildBrokerHost::ConnectToProcess(base::ProcessId process_id,
76 ScopedPlatformHandle pipe) {
77 if (!child_channel_)
78 return; // Can happen at process shutdown on Windows.
79 ConnectToProcessMessage data;
80 data.type = CONNECT_TO_PROCESS;
81 data.process_id = process_id;
82 scoped_ptr<MessageInTransit> message(new MessageInTransit(
83 MessageInTransit::Type::MESSAGE, sizeof(data), &data));
84 scoped_refptr<Dispatcher> dispatcher =
85 PlatformHandleDispatcher::Create(pipe.Pass());
86 internal::g_core->AddDispatcher(dispatcher);
87 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector);
88 dispatchers->push_back(dispatcher);
89 message->SetDispatchers(dispatchers.Pass());
90 message->SerializeAndCloseDispatchers();
91 child_channel_->WriteMessage(message.Pass());
92 }
93
94 void ChildBrokerHost::ConnectMessagePipe(uint64_t pipe_id,
95 base::ProcessId process_id) {
96 if (!child_channel_)
97 return; // Can happen at process shutdown on Windows.
98 PeerPipeConnectedMessage data;
99 data.type = PEER_PIPE_CONNECTED;
100 data.pipe_id = pipe_id;
101 data.process_id = process_id;
102 scoped_ptr<MessageInTransit> message(new MessageInTransit(
103 MessageInTransit::Type::MESSAGE, sizeof(data), &data));
104 child_channel_->WriteMessage(message.Pass());
41 } 105 }
42 106
43 ChildBrokerHost::~ChildBrokerHost() { 107 ChildBrokerHost::~ChildBrokerHost() {
108 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
109 BrokerState::GetInstance()->ChildBrokerHostDestructed(this);
110 if (child_channel_)
111 child_channel_->Shutdown();
44 } 112 }
45 113
46 void ChildBrokerHost::RegisterIOHandler() { 114 void ChildBrokerHost::OnReadMessage(
47 #if defined(OS_WIN) 115 const MessageInTransit::View& message_view,
48 base::MessageLoopForIO::current()->RegisterIOHandler( 116 ScopedPlatformHandleVectorPtr platform_handles) {
49 pipe_.get().handle, this); 117 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
50 BeginRead(); 118 CHECK(!platform_handles);
51 #elif defined(OS_POSIX) 119 if (message_view.num_bytes() !=
52 // TOOD(jam): setup 120 static_cast<uint32_t>(sizeof(ConnectMessagePipeMessage))) {
121 NOTREACHED();
122 delete this;
123 }
124
125 const ConnectMessagePipeMessage* message =
126 static_cast<const ConnectMessagePipeMessage*>(message_view.bytes());
127 switch(message->type) {
128 case CONNECT_MESSAGE_PIPE:
129 BrokerState::GetInstance()->HandleConnectMessagePipe(this,
130 message->pipe_id);
131 break;
132 case CANCEL_CONNECT_MESSAGE_PIPE:
133 BrokerState::GetInstance()->HandleCancelConnectMessagePipe(
134 message->pipe_id);
135 break;
136 default:
137 NOTREACHED();
138 delete this;
139 }
140 }
141
142 void ChildBrokerHost::OnError(Error error) {
143 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
144 child_channel_->Shutdown();
145 child_channel_ = nullptr;
146 // On Windows, we have two pipes to the child process. It's easier to wait
147 // until we get the error from the pipe that uses asynchronous I/O.
148 #if !defined(OS_WIN)
149 delete this;
53 #endif 150 #endif
54 } 151 }
55 152
153 #if defined(OS_WIN)
154 void ChildBrokerHost::RegisterIOHandler() {
155 base::MessageLoopForIO::current()->RegisterIOHandler(
156 sync_channel_.get().handle, this);
157 BeginRead();
158 }
159
56 void ChildBrokerHost::BeginRead() { 160 void ChildBrokerHost::BeginRead() {
57 #if defined(OS_WIN) 161 BOOL rv = ReadFile(sync_channel_.get().handle,
58 BOOL rv = ReadFile(pipe_.get().handle, &read_data_[num_bytes_read_], 162 &read_data_[num_bytes_read_],
59 static_cast<int>(read_data_.size() - num_bytes_read_), 163 static_cast<int>(read_data_.size() - num_bytes_read_),
60 nullptr, &read_context_.overlapped); 164 nullptr, &read_context_.overlapped);
61 if (rv || GetLastError() == ERROR_IO_PENDING) 165 if (rv || GetLastError() == ERROR_IO_PENDING)
62 return; 166 return;
63 167
64 if (rv == ERROR_BROKEN_PIPE) { 168 if (GetLastError() == ERROR_BROKEN_PIPE) {
65 delete this; 169 delete this;
66 return; 170 return;
67 } 171 }
68 172
69 NOTREACHED() << "Unknown error in ChildBrokerHost " << rv; 173 NOTREACHED() << "Unknown error in ChildBrokerHost " << rv;
70 #endif
71 } 174 }
72 175
73 #if defined(OS_WIN)
74 void ChildBrokerHost::OnIOCompleted(base::MessageLoopForIO::IOContext* context, 176 void ChildBrokerHost::OnIOCompleted(base::MessageLoopForIO::IOContext* context,
75 DWORD bytes_transferred, 177 DWORD bytes_transferred,
76 DWORD error) { 178 DWORD error) {
77 if (context != &read_context_) 179 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
78 return;
79
80 if (error == ERROR_BROKEN_PIPE) { 180 if (error == ERROR_BROKEN_PIPE) {
81 delete this; 181 delete this;
82 return; // Child process exited or crashed. 182 return; // Child process exited or crashed.
83 } 183 }
84 184
85 if (error != ERROR_SUCCESS) { 185 if (error != ERROR_SUCCESS) {
86 NOTREACHED() << "Error " << error << " in ChildBrokerHost."; 186 NOTREACHED() << "Error " << error << " in ChildBrokerHost.";
87 delete this; 187 delete this;
88 return; 188 return;
89 } 189 }
90 190
191 if (context == &write_context_) {
192 write_data_.clear();
193 return;
194 }
195
91 num_bytes_read_ += bytes_transferred; 196 num_bytes_read_ += bytes_transferred;
92 CHECK_GE(num_bytes_read_, sizeof(uint32_t)); 197 CHECK_GE(num_bytes_read_, sizeof(uint32_t));
93 BrokerMessage* message = reinterpret_cast<BrokerMessage*>(&read_data_[0]); 198 BrokerMessage* message = reinterpret_cast<BrokerMessage*>(&read_data_[0]);
94 if (num_bytes_read_ < message->size) { 199 if (num_bytes_read_ < message->size) {
95 read_data_.resize(message->size); 200 read_data_.resize(message->size);
96 BeginRead(); 201 BeginRead();
97 return; 202 return;
98 } 203 }
99 204
205 // This should never fire because we only get new requests from a child
206 // process after it has read all the previous data we wrote.
207 if (!write_data_.empty()) {
208 NOTREACHED() << "ChildBrokerHost shouldn't have data to write when it gets "
209 << " a new request";
210 delete this;
211 return;
212 }
213
100 if (message->id == CREATE_PLATFORM_CHANNEL_PAIR) { 214 if (message->id == CREATE_PLATFORM_CHANNEL_PAIR) {
101 PlatformChannelPair channel_pair; 215 PlatformChannelPair channel_pair;
102 uint32_t response_size = 2 * sizeof(HANDLE); 216 uint32_t response_size = 2 * sizeof(HANDLE);
103 write_data_.resize(response_size); 217 write_data_.resize(response_size);
104 HANDLE* handles = reinterpret_cast<HANDLE*>(&write_data_[0]); 218 HANDLE* handles = reinterpret_cast<HANDLE*>(&write_data_[0]);
105 handles[0] = DuplicateToChild( 219 handles[0] = DuplicateToChild(
106 channel_pair.PassServerHandle().release().handle); 220 channel_pair.PassServerHandle().release().handle);
107 handles[1] = DuplicateToChild( 221 handles[1] = DuplicateToChild(
108 channel_pair.PassClientHandle().release().handle); 222 channel_pair.PassClientHandle().release().handle);
109 } else if (message->id == HANDLE_TO_TOKEN) { 223 } else if (message->id == HANDLE_TO_TOKEN) {
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
145 NOTREACHED() << "Unknown token"; 259 NOTREACHED() << "Unknown token";
146 handles[i] = INVALID_HANDLE_VALUE; 260 handles[i] = INVALID_HANDLE_VALUE;
147 } 261 }
148 } 262 }
149 } else { 263 } else {
150 NOTREACHED() << "Unknown command. Stopping reading."; 264 NOTREACHED() << "Unknown command. Stopping reading.";
151 delete this; 265 delete this;
152 return; 266 return;
153 } 267 }
154 268
155 BOOL rv = WriteFile(pipe_.get().handle, &write_data_[0], 269 BOOL rv = WriteFile(sync_channel_.get().handle, &write_data_[0],
156 static_cast<int>(write_data_.size()), NULL, 270 static_cast<int>(write_data_.size()), NULL,
157 &write_context_.overlapped); 271 &write_context_.overlapped);
158 DCHECK(rv || GetLastError() == ERROR_IO_PENDING); 272 DCHECK(rv || GetLastError() == ERROR_IO_PENDING);
159 273
160 // Start reading again. 274 // Start reading again.
161 num_bytes_read_ = 0; 275 num_bytes_read_ = 0;
162 BeginRead(); 276 BeginRead();
163 } 277 }
164 278
165 HANDLE ChildBrokerHost::DuplicateToChild(HANDLE handle) { 279 HANDLE ChildBrokerHost::DuplicateToChild(HANDLE handle) {
(...skipping 10 matching lines...) Expand all
176 BOOL result = DuplicateHandle(child_process_, handle, 290 BOOL result = DuplicateHandle(child_process_, handle,
177 base::GetCurrentProcessHandle(), &rv, 0, FALSE, 291 base::GetCurrentProcessHandle(), &rv, 0, FALSE,
178 DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); 292 DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
179 DCHECK(result); 293 DCHECK(result);
180 return rv; 294 return rv;
181 } 295 }
182 #endif 296 #endif
183 297
184 } // namespace edk 298 } // namespace edk
185 } // namespace mojo 299 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/child_broker_host.h ('k') | mojo/edk/system/core.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698