Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(896)

Side by Side Diff: mojo/edk/system/child_broker_host.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix chrome and POSIX Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/child_broker_host.h" 5 #include "mojo/edk/system/child_broker_host.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/lazy_instance.h" 8 #include "base/lazy_instance.h"
9 #include "mojo/edk/embedder/embedder_internal.h"
9 #include "mojo/edk/embedder/platform_channel_pair.h" 10 #include "mojo/edk/embedder/platform_channel_pair.h"
10 #include "mojo/edk/system/broker_messages.h" 11 #include "mojo/edk/system/broker_messages.h"
11 #include "mojo/edk/system/broker_state.h" 12 #include "mojo/edk/system/broker_state.h"
12 #include "mojo/edk/system/configuration.h" 13 #include "mojo/edk/system/configuration.h"
14 #include "mojo/edk/system/core.h"
15 #include "mojo/edk/system/platform_handle_dispatcher.h"
13 16
14 namespace mojo { 17 namespace mojo {
15 namespace edk { 18 namespace edk {
16 19
17 namespace { 20 namespace {
21 #if defined(OS_WIN)
18 static const int kDefaultReadBufferSize = 256; 22 static const int kDefaultReadBufferSize = 256;
23 #endif
19 } 24 }
20 25
21 ChildBrokerHost::ChildBrokerHost(base::ProcessHandle child_process, 26 ChildBrokerHost::ChildBrokerHost(base::ProcessHandle child_process,
22 ScopedPlatformHandle pipe) 27 ScopedPlatformHandle pipe)
23 : child_process_(child_process), 28 : process_id_(base::GetProcId(child_process)) {
24 pipe_(pipe.Pass()), 29 ScopedPlatformHandle parent_async_channel_handle;
25 num_bytes_read_(0) { 30 #if defined(OS_POSIX)
26 #if defined(OS_WIN) 31 parent_async_channel_handle = pipe.Pass();
32 #else
33 child_process_ = child_process;
34 sync_channel_ = pipe.Pass();
27 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); 35 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
28 read_context_.handler = this; 36 read_context_.handler = this;
29 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); 37 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
30 write_context_.handler = this; 38 write_context_.handler = this;
31 #else 39 read_data_.resize(kDefaultReadBufferSize);
32 // TODO(jam) 40 num_bytes_read_ = 0;
33 (void)child_process_; // Suppress -Wunused-private-field. 41
34 (void)num_bytes_read_; // Suppress -Wunused-private-field. 42 // See comment in ChildBroker::SetChildBrokerHostHandle. Summary is we need
43 // two pipes on Windows, so send the second one over the first one.
44 PlatformChannelPair parent_pipe;
45 parent_async_channel_handle = parent_pipe.PassServerHandle();
46
47 HANDLE duplicated_child_handle =
48 DuplicateToChild(parent_pipe.PassClientHandle().release().handle);
49 BOOL rv = WriteFile(sync_channel_.get().handle,
50 &duplicated_child_handle, sizeof(duplicated_child_handle),
51 NULL, &write_context_.overlapped);
52 DCHECK(rv || GetLastError() == ERROR_IO_PENDING);
53
54 internal::g_io_thread_task_runner->PostTask(
55 FROM_HERE,
56 base::Bind(&ChildBrokerHost::RegisterIOHandler, base::Unretained(this)));
35 #endif 57 #endif
36 58
37 read_data_.resize(kDefaultReadBufferSize); 59 child_channel_ = RawChannel::Create(parent_async_channel_handle.Pass());
38 BrokerState::GetInstance()->broker_thread()->PostTask( 60 internal::g_io_thread_task_runner->PostTask(
39 FROM_HERE, 61 FROM_HERE,
40 base::Bind(&ChildBrokerHost::RegisterIOHandler, base::Unretained(this))); 62 base::Bind(&RawChannel::Init, base::Unretained(child_channel_), this));
63 internal::g_io_thread_task_runner->PostTask(
64 FROM_HERE,
yzshen1 2015/12/03 23:37:50 nit: incorrect indent.
jam 2015/12/04 05:06:47 Done.
65 base::Bind(&RawChannel::EnsureLazyInitialized,
66 base::Unretained(child_channel_)));
67
68 BrokerState::GetInstance()->ChildBrokerHostCreated(this);
69 }
70
71 base::ProcessId ChildBrokerHost::GetProcessId() {
72 return process_id_;
73 }
74
75 void ChildBrokerHost::ConnectToProcess(base::ProcessId process_id,
76 ScopedPlatformHandle pipe) {
77 ConnectToProcessMessage data;
78 data.type = CONNECT_TO_PROCESS;
79 data.process_id = process_id;
80 scoped_ptr<MessageInTransit> message(new MessageInTransit(
81 MessageInTransit::Type::MESSAGE, sizeof(data), &data));
82 scoped_refptr<Dispatcher> dispatcher =
83 PlatformHandleDispatcher::Create(pipe.Pass());
84 internal::g_core->AddDispatcher(dispatcher);
85 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector);
86 dispatchers->push_back(dispatcher);
87 message->SetDispatchers(dispatchers.Pass());
88 message->SerializeAndCloseDispatchers();
89 child_channel_->WriteMessage(message.Pass());
90 }
91
92 void ChildBrokerHost::ConnectMessagePipe(uint64_t pipe_id,
93 base::ProcessId peer_pid) {
94 PeerPipeConnectedMessage data;
95 data.type = PEER_PIPE_CONNECTED;
96 data.pipe_id = pipe_id;
97 data.process_id = peer_pid;
98 scoped_ptr<MessageInTransit> message(new MessageInTransit(
99 MessageInTransit::Type::MESSAGE, sizeof(data), &data));
100 child_channel_->WriteMessage(message.Pass());
41 } 101 }
42 102
43 ChildBrokerHost::~ChildBrokerHost() { 103 ChildBrokerHost::~ChildBrokerHost() {
104 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
105 BrokerState::GetInstance()->ChildBrokerHostDestructed(this);
106 if (child_channel_)
107 child_channel_->Shutdown();
44 } 108 }
45 109
46 void ChildBrokerHost::RegisterIOHandler() { 110 void ChildBrokerHost::OnReadMessage(
47 #if defined(OS_WIN) 111 const MessageInTransit::View& message_view,
48 base::MessageLoopForIO::current()->RegisterIOHandler( 112 ScopedPlatformHandleVectorPtr platform_handles) {
49 pipe_.get().handle, this); 113 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
50 BeginRead(); 114 CHECK(!platform_handles);
51 #elif defined(OS_POSIX) 115 if (message_view.num_bytes() !=
52 // TOOD(jam): setup 116 static_cast<uint32_t>(sizeof(ConnectMessagePipeMessage))) {
117 NOTREACHED();
118 delete this;
119 }
120
121 const ConnectMessagePipeMessage* message =
122 static_cast<const ConnectMessagePipeMessage*>(message_view.bytes());
123 switch(message->type) {
124 case CONNECT_MESSAGE_PIPE:
125 BrokerState::GetInstance()->HandleConnectMessagePipe(this,
126 message->pipe_id);
127 break;
128 case CANCEL_CONNECT_MESSAGE_PIPE:
129 BrokerState::GetInstance()->HandleCancelConnectMessagePipe(
130 message->pipe_id);
131 break;
132 default:
133 NOTREACHED();
134 delete this;
135 }
136 }
137
138 void ChildBrokerHost::OnError(Error error) {
139 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
140 child_channel_->Shutdown();
141 child_channel_ = nullptr;
142 // On Windows, we have two pipes to the child process. It's easier to wait
143 // until we get the error from the pipe that uses asynchronous I/O.
144 #if !defined(OS_WIN)
145 delete this;
53 #endif 146 #endif
54 } 147 }
55 148
149 #if defined(OS_WIN)
150 void ChildBrokerHost::RegisterIOHandler() {
151 base::MessageLoopForIO::current()->RegisterIOHandler(
152 sync_channel_.get().handle, this);
153 BeginRead();
154 }
155
56 void ChildBrokerHost::BeginRead() { 156 void ChildBrokerHost::BeginRead() {
57 #if defined(OS_WIN) 157 BOOL rv = ReadFile(sync_channel_.get().handle,
58 BOOL rv = ReadFile(pipe_.get().handle, &read_data_[num_bytes_read_], 158 &read_data_[num_bytes_read_],
59 static_cast<int>(read_data_.size() - num_bytes_read_), 159 static_cast<int>(read_data_.size() - num_bytes_read_),
60 nullptr, &read_context_.overlapped); 160 nullptr, &read_context_.overlapped);
61 if (rv || GetLastError() == ERROR_IO_PENDING) 161 if (rv || GetLastError() == ERROR_IO_PENDING)
62 return; 162 return;
63 163
64 if (rv == ERROR_BROKEN_PIPE) { 164 if (GetLastError() == ERROR_BROKEN_PIPE) {
65 delete this; 165 delete this;
66 return; 166 return;
67 } 167 }
68 168
69 NOTREACHED() << "Unknown error in ChildBrokerHost " << rv; 169 NOTREACHED() << "Unknown error in ChildBrokerHost " << rv;
70 #endif
71 } 170 }
72 171
73 #if defined(OS_WIN)
74 void ChildBrokerHost::OnIOCompleted(base::MessageLoopForIO::IOContext* context, 172 void ChildBrokerHost::OnIOCompleted(base::MessageLoopForIO::IOContext* context,
75 DWORD bytes_transferred, 173 DWORD bytes_transferred,
76 DWORD error) { 174 DWORD error) {
77 if (context != &read_context_) 175 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
78 return;
79
80 if (error == ERROR_BROKEN_PIPE) { 176 if (error == ERROR_BROKEN_PIPE) {
81 delete this; 177 delete this;
82 return; // Child process exited or crashed. 178 return; // Child process exited or crashed.
83 } 179 }
84 180
85 if (error != ERROR_SUCCESS) { 181 if (error != ERROR_SUCCESS) {
86 NOTREACHED() << "Error " << error << " in ChildBrokerHost."; 182 NOTREACHED() << "Error " << error << " in ChildBrokerHost.";
87 delete this; 183 delete this;
88 return; 184 return;
89 } 185 }
90 186
187 if (context == &write_context_) {
188 write_data_.clear();
189 return;
190 }
191
91 num_bytes_read_ += bytes_transferred; 192 num_bytes_read_ += bytes_transferred;
92 CHECK_GE(num_bytes_read_, sizeof(uint32_t)); 193 CHECK_GE(num_bytes_read_, sizeof(uint32_t));
93 BrokerMessage* message = reinterpret_cast<BrokerMessage*>(&read_data_[0]); 194 BrokerMessage* message = reinterpret_cast<BrokerMessage*>(&read_data_[0]);
94 if (num_bytes_read_ < message->size) { 195 if (num_bytes_read_ < message->size) {
95 read_data_.resize(message->size); 196 read_data_.resize(message->size);
96 BeginRead(); 197 BeginRead();
97 return; 198 return;
98 } 199 }
99 200
201 // This should never fire because we only get new requests from a child
202 // process after it has read all the previous data we wrote.
203 CHECK(write_data_.empty());
100 if (message->id == CREATE_PLATFORM_CHANNEL_PAIR) { 204 if (message->id == CREATE_PLATFORM_CHANNEL_PAIR) {
101 PlatformChannelPair channel_pair; 205 PlatformChannelPair channel_pair;
102 uint32_t response_size = 2 * sizeof(HANDLE); 206 uint32_t response_size = 2 * sizeof(HANDLE);
103 write_data_.resize(response_size); 207 write_data_.resize(response_size);
104 HANDLE* handles = reinterpret_cast<HANDLE*>(&write_data_[0]); 208 HANDLE* handles = reinterpret_cast<HANDLE*>(&write_data_[0]);
105 handles[0] = DuplicateToChild( 209 handles[0] = DuplicateToChild(
106 channel_pair.PassServerHandle().release().handle); 210 channel_pair.PassServerHandle().release().handle);
107 handles[1] = DuplicateToChild( 211 handles[1] = DuplicateToChild(
108 channel_pair.PassClientHandle().release().handle); 212 channel_pair.PassClientHandle().release().handle);
109 } else if (message->id == HANDLE_TO_TOKEN) { 213 } else if (message->id == HANDLE_TO_TOKEN) {
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
145 NOTREACHED() << "Unknown token"; 249 NOTREACHED() << "Unknown token";
146 handles[i] = INVALID_HANDLE_VALUE; 250 handles[i] = INVALID_HANDLE_VALUE;
147 } 251 }
148 } 252 }
149 } else { 253 } else {
150 NOTREACHED() << "Unknown command. Stopping reading."; 254 NOTREACHED() << "Unknown command. Stopping reading.";
151 delete this; 255 delete this;
152 return; 256 return;
153 } 257 }
154 258
155 BOOL rv = WriteFile(pipe_.get().handle, &write_data_[0], 259 BOOL rv = WriteFile(sync_channel_.get().handle, &write_data_[0],
156 static_cast<int>(write_data_.size()), NULL, 260 static_cast<int>(write_data_.size()), NULL,
157 &write_context_.overlapped); 261 &write_context_.overlapped);
158 DCHECK(rv || GetLastError() == ERROR_IO_PENDING); 262 DCHECK(rv || GetLastError() == ERROR_IO_PENDING);
159 263
160 // Start reading again. 264 // Start reading again.
161 num_bytes_read_ = 0; 265 num_bytes_read_ = 0;
162 BeginRead(); 266 BeginRead();
163 } 267 }
164 268
165 HANDLE ChildBrokerHost::DuplicateToChild(HANDLE handle) { 269 HANDLE ChildBrokerHost::DuplicateToChild(HANDLE handle) {
(...skipping 10 matching lines...) Expand all
176 BOOL result = DuplicateHandle(child_process_, handle, 280 BOOL result = DuplicateHandle(child_process_, handle,
177 base::GetCurrentProcessHandle(), &rv, 0, FALSE, 281 base::GetCurrentProcessHandle(), &rv, 0, FALSE,
178 DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); 282 DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
179 DCHECK(result); 283 DCHECK(result);
180 return rv; 284 return rv;
181 } 285 }
182 #endif 286 #endif
183 287
184 } // namespace edk 288 } // namespace edk
185 } // namespace mojo 289 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698