Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tsepez review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/message_pipe_perftest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h" 9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h" 10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/embedder/platform_handle_utils.h" 11 #include "mojo/edk/embedder/platform_handle_utils.h"
12 #include "mojo/edk/embedder/platform_shared_buffer.h" 12 #include "mojo/edk/embedder/platform_shared_buffer.h"
13 #include "mojo/edk/embedder/platform_support.h" 13 #include "mojo/edk/embedder/platform_support.h"
14 #include "mojo/edk/system/broker.h" 14 #include "mojo/edk/system/broker.h"
15 #include "mojo/edk/system/configuration.h" 15 #include "mojo/edk/system/configuration.h"
16 #include "mojo/edk/system/message_in_transit.h" 16 #include "mojo/edk/system/message_in_transit.h"
17 #include "mojo/edk/system/options_validation.h" 17 #include "mojo/edk/system/options_validation.h"
18 #include "mojo/edk/system/transport_data.h" 18 #include "mojo/edk/system/transport_data.h"
19 19
20 namespace mojo { 20 namespace mojo {
21 namespace edk { 21 namespace edk {
22 22
23 namespace { 23 namespace {
24 24
25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
26 26
27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
28 bool transferable;
29 bool write_error;
30 uint64_t pipe_id; // If transferable is false.
31 // The following members are only set if transferable is true.
28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
29 // was closed. 33 // was closed.
30 size_t platform_handle_index; 34 size_t platform_handle_index;
31 bool write_error;
32 35
33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
34 uint32_t shared_memory_size; 37 uint32_t shared_memory_size;
35 38
36 size_t serialized_read_buffer_size; 39 size_t serialized_read_buffer_size;
37 size_t serialized_write_buffer_size; 40 size_t serialized_write_buffer_size;
38 size_t serialized_message_queue_size; 41 size_t serialized_message_queue_size;
39 42
40 // These are the FDs required as part of serializing channel_ and 43 // These are the FDs required as part of serializing channel_ and
41 // message_queue_. This is only used on POSIX. 44 // message_queue_. This is only used on POSIX.
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 86
84 const MojoCreateMessagePipeOptions 87 const MojoCreateMessagePipeOptions
85 MessagePipeDispatcher::kDefaultCreateOptions = { 88 MessagePipeDispatcher::kDefaultCreateOptions = {
86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
88 91
89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
90 const MojoCreateMessagePipeOptions* in_options, 93 const MojoCreateMessagePipeOptions* in_options,
91 MojoCreateMessagePipeOptions* out_options) { 94 MojoCreateMessagePipeOptions* out_options) {
92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE |
97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
94 98
95 *out_options = kDefaultCreateOptions; 99 *out_options = kDefaultCreateOptions;
96 if (!in_options) 100 if (!in_options)
97 return MOJO_RESULT_OK; 101 return MOJO_RESULT_OK;
98 102
99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
100 if (!reader.is_valid()) 104 if (!reader.is_valid())
101 return MOJO_RESULT_INVALID_ARGUMENT; 105 return MOJO_RESULT_INVALID_ARGUMENT;
102 106
103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
104 return MOJO_RESULT_OK; 108 return MOJO_RESULT_OK;
105 if ((reader.options().flags & ~kKnownFlags)) 109 if ((reader.options().flags & ~kKnownFlags))
106 return MOJO_RESULT_UNIMPLEMENTED; 110 return MOJO_RESULT_UNIMPLEMENTED;
107 out_options->flags = reader.options().flags; 111 out_options->flags = reader.options().flags;
108 112
109 // Checks for fields beyond |flags|: 113 // Checks for fields beyond |flags|:
110 114
111 // (Nothing here yet.) 115 // (Nothing here yet.)
112 116
113 return MOJO_RESULT_OK; 117 return MOJO_RESULT_OK;
114 } 118 }
115 119
116 void MessagePipeDispatcher::Init( 120 void MessagePipeDispatcher::Init(
117 ScopedPlatformHandle message_pipe, 121 ScopedPlatformHandle message_pipe,
118 char* serialized_read_buffer, size_t serialized_read_buffer_size, 122 char* serialized_read_buffer, size_t serialized_read_buffer_size,
119 char* serialized_write_buffer, size_t serialized_write_buffer_size, 123 char* serialized_write_buffer, size_t serialized_write_buffer_size,
120 std::vector<int>* serialized_read_fds, 124 std::vector<int>* serialized_read_fds,
121 std::vector<int>* serialized_write_fds) { 125 std::vector<int>* serialized_write_fds) {
126 CHECK(transferable_);
122 if (message_pipe.get().is_valid()) { 127 if (message_pipe.get().is_valid()) {
123 channel_ = RawChannel::Create(message_pipe.Pass()); 128 channel_ = RawChannel::Create(message_pipe.Pass());
124 129
125 // TODO(jam): It's probably cleaner to pass this in Init call. 130 // TODO(jam): It's probably cleaner to pass this in Init call.
126 channel_->SetSerializedData( 131 channel_->SetSerializedData(
127 serialized_read_buffer, serialized_read_buffer_size, 132 serialized_read_buffer, serialized_read_buffer_size,
128 serialized_write_buffer, serialized_write_buffer_size, 133 serialized_write_buffer, serialized_write_buffer_size,
129 serialized_read_fds, serialized_write_fds); 134 serialized_read_fds, serialized_write_fds);
130 internal::g_io_thread_task_runner->PostTask( 135 internal::g_io_thread_task_runner->PostTask(
131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
132 } 137 }
133 } 138 }
134 139
140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
141 CHECK(!transferable_);
142 pipe_id_ = pipe_id;
143 }
144
135 void MessagePipeDispatcher::InitOnIO() { 145 void MessagePipeDispatcher::InitOnIO() {
136 base::AutoLock locker(lock()); 146 base::AutoLock locker(lock());
147 CHECK(transferable_);
137 calling_init_ = true; 148 calling_init_ = true;
138 if (channel_) 149 if (channel_)
139 channel_->Init(this); 150 channel_->Init(this);
140 calling_init_ = false; 151 calling_init_ = false;
141 } 152 }
142 153
143 void MessagePipeDispatcher::CloseOnIO() { 154 void MessagePipeDispatcher::CloseOnIO() {
144 base::AutoLock locker(lock()); 155 base::AutoLock locker(lock());
156 if (transferable_) {
157 if (channel_) {
158 channel_->Shutdown();
159 channel_ = nullptr;
160 }
161 } else {
162 if (non_transferable_state_ == CONNECT_CALLED ||
163 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
164 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
165 RequestNontransferableChannel();
145 166
146 if (channel_) { 167 // We can't cancel the pending request yet, since the other side of the
147 channel_->Shutdown(); 168 // message pipe would want to get pending outgoing messages (if any) or
148 channel_ = nullptr; 169 // at least know that this end was closed. So keep this object alive until
170 // then.
171 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
172 AddRef();
173 } else if (non_transferable_state_ == CONNECTED) {
174 internal::g_broker->CloseMessagePipe(pipe_id_, this);
175 non_transferable_state_ = CLOSED;
176 channel_ = nullptr;
177 }
149 } 178 }
150 } 179 }
151 180
152 Dispatcher::Type MessagePipeDispatcher::GetType() const { 181 Dispatcher::Type MessagePipeDispatcher::GetType() const {
153 return Type::MESSAGE_PIPE; 182 return Type::MESSAGE_PIPE;
154 } 183 }
155 184
185 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
186 base::AutoLock locker(lock());
187 channel_ = channel;
188 while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
189 channel_->WriteMessage(
190 non_transferable_outgoing_message_queue_.GetMessage());
191 }
192
193 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
194 // We kept this object alive until it's connected, we can release it now.
195 // Since we're in a callback from the Broker, call it asynchronously.
196 internal::g_io_thread_task_runner->PostTask(
197 FROM_HERE,
198 base::Bind(&Broker::CloseMessagePipe,
199 base::Unretained(internal::g_broker), pipe_id_,
200 base::Unretained(this)));
201 non_transferable_state_ = CLOSED;
202 channel_ = nullptr;
203 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
204 } else {
205 non_transferable_state_ = CONNECTED;
206 }
207 }
208
156 #if defined(OS_WIN) 209 #if defined(OS_WIN)
157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the 210 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
158 // best way we want to share this. 211 // best way we want to share this.
159 // Since this is used for serialization of messages read/written to a MP that 212 // Since this is used for serialization of messages read/written to a MP that
160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of 213 // aren't consumed by Mojo primitives yet, there could be an unbounded number of
161 // them when a MP is being sent. As a result, even for POSIX we will probably 214 // them when a MP is being sent. As a result, even for POSIX we will probably
162 // want to send the handles to the shell process and exchange them for tokens 215 // want to send the handles to the shell process and exchange them for tokens
163 // (since we can be sure that the shell will respond to our IPCs, compared to 216 // (since we can be sure that the shell will respond to our IPCs, compared to
164 // the other end where we're sending the MP to, which may not be reading...). 217 // the other end where we're sending the MP to, which may not be reading...).
165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( 218 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
(...skipping 13 matching lines...) Expand all
179 const void* source, 232 const void* source,
180 size_t size, 233 size_t size,
181 PlatformHandleVector* platform_handles) { 234 PlatformHandleVector* platform_handles) {
182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { 235 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) {
183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; 236 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)";
184 return nullptr; 237 return nullptr;
185 } 238 }
186 239
187 const SerializedMessagePipeHandleDispatcher* serialization = 240 const SerializedMessagePipeHandleDispatcher* serialization =
188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); 241 static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
242
243 scoped_refptr<MessagePipeDispatcher> rv(
244 new MessagePipeDispatcher(serialization->transferable));
245 if (!rv->transferable_) {
246 rv->InitNonTransferable(serialization->pipe_id);
247 return rv;
248 }
249
189 if (serialization->shared_memory_size != 250 if (serialization->shared_memory_size !=
190 (serialization->serialized_read_buffer_size + 251 (serialization->serialized_read_buffer_size +
191 serialization->serialized_write_buffer_size + 252 serialization->serialized_write_buffer_size +
192 serialization->serialized_message_queue_size)) { 253 serialization->serialized_message_queue_size)) {
193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; 254 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)";
194 return nullptr; 255 return nullptr;
195 } 256 }
196 257
197 ScopedPlatformHandle platform_handle, shared_memory_handle; 258 ScopedPlatformHandle platform_handle, shared_memory_handle;
198 if (!GetHandle(serialization->platform_handle_index, 259 if (!GetHandle(serialization->platform_handle_index,
(...skipping 27 matching lines...) Expand all
226 serialization->serialized_write_buffer_size; 287 serialization->serialized_write_buffer_size;
227 buffer += serialized_write_buffer_size; 288 buffer += serialized_write_buffer_size;
228 } 289 }
229 if (serialization->serialized_message_queue_size) { 290 if (serialization->serialized_message_queue_size) {
230 message_queue_data = buffer; 291 message_queue_data = buffer;
231 message_queue_size = serialization->serialized_message_queue_size; 292 message_queue_size = serialization->serialized_message_queue_size;
232 buffer += message_queue_size; 293 buffer += message_queue_size;
233 } 294 }
234 } 295 }
235 296
236 scoped_refptr<MessagePipeDispatcher> rv(
237 Create(MessagePipeDispatcher::kDefaultCreateOptions));
238 rv->write_error_ = serialization->write_error; 297 rv->write_error_ = serialization->write_error;
239 298
240 std::vector<int> serialized_read_fds; 299 std::vector<int> serialized_read_fds;
241 std::vector<int> serialized_write_fds; 300 std::vector<int> serialized_write_fds;
242 #if defined(OS_POSIX) 301 #if defined(OS_POSIX)
243 std::vector<int> serialized_fds; 302 std::vector<int> serialized_fds;
244 size_t serialized_fds_index = 0; 303 size_t serialized_fds_index = 0;
245 304
246 size_t total_fd_count = serialization->serialized_read_fds_length + 305 size_t total_fd_count = serialization->serialized_read_fds_length +
247 serialization->serialized_write_fds_length + 306 serialization->serialized_write_fds_length +
(...skipping 14 matching lines...) Expand all
262 serialized_fds_index += serialization->serialized_read_fds_length; 321 serialized_fds_index += serialization->serialized_read_fds_length;
263 serialized_write_fds.assign( 322 serialized_write_fds.assign(
264 serialized_fds.begin() + serialized_fds_index, 323 serialized_fds.begin() + serialized_fds_index,
265 serialized_fds.begin() + serialized_fds_index + 324 serialized_fds.begin() + serialized_fds_index +
266 serialization->serialized_write_fds_length); 325 serialization->serialized_write_fds_length);
267 serialized_fds_index += serialization->serialized_write_fds_length; 326 serialized_fds_index += serialization->serialized_write_fds_length;
268 #endif 327 #endif
269 328
270 while (message_queue_size) { 329 while (message_queue_size) {
271 size_t message_size; 330 size_t message_size;
272 CHECK(MessageInTransit::GetNextMessageSize( 331 if (!MessageInTransit::GetNextMessageSize(
273 message_queue_data, message_queue_size, &message_size)); 332 message_queue_data, message_queue_size, &message_size)) {
333 NOTREACHED() << "Couldn't read message size from serialized data.";
334 return nullptr;
335 }
336 if (message_size > message_queue_size) {
337 NOTREACHED() << "Invalid serialized message size.";
338 return nullptr;
339 }
274 MessageInTransit::View message_view(message_size, message_queue_data); 340 MessageInTransit::View message_view(message_size, message_queue_data);
275 message_queue_size -= message_size; 341 message_queue_size -= message_size;
276 message_queue_data += message_size; 342 message_queue_data += message_size;
277 343
278 // TODO(jam): Copied below from RawChannelWin. See commment above 344 // TODO(jam): Copied below from RawChannelWin. See commment above
279 // GetReadPlatformHandles. 345 // GetReadPlatformHandles.
280 ScopedPlatformHandleVectorPtr temp_platform_handles; 346 ScopedPlatformHandleVectorPtr temp_platform_handles;
281 if (message_view.transport_data_buffer()) { 347 if (message_view.transport_data_buffer()) {
282 size_t num_platform_handles; 348 size_t num_platform_handles;
283 const void* platform_handle_table; 349 const void* platform_handle_table;
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 &serialized_write_fds); 392 &serialized_write_fds);
327 393
328 if (message_queue_size) { // Should be empty by now. 394 if (message_queue_size) { // Should be empty by now.
329 LOG(ERROR) << "Invalid queued messages"; 395 LOG(ERROR) << "Invalid queued messages";
330 return nullptr; 396 return nullptr;
331 } 397 }
332 398
333 return rv; 399 return rv;
334 } 400 }
335 401
336 MessagePipeDispatcher::MessagePipeDispatcher() 402 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
337 : channel_(nullptr), 403 : channel_(nullptr),
338 serialized_(false),
339 serialized_read_fds_length_(0u), 404 serialized_read_fds_length_(0u),
340 serialized_write_fds_length_(0u), 405 serialized_write_fds_length_(0u),
341 serialized_message_fds_length_(0u), 406 serialized_message_fds_length_(0u),
407 pipe_id_(0),
408 non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
409 serialized_(false),
342 calling_init_(false), 410 calling_init_(false),
343 write_error_(false) { 411 write_error_(false),
412 transferable_(transferable) {
344 } 413 }
345 414
346 MessagePipeDispatcher::~MessagePipeDispatcher() { 415 MessagePipeDispatcher::~MessagePipeDispatcher() {
347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The 416 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
348 // exception is if they posted a task to run CloseOnIO but the IO thread shut 417 // exception is if they posted a task to run CloseOnIO but the IO thread shut
349 // down and so when it was deleting pending tasks it caused the last reference 418 // down and so when it was deleting pending tasks it caused the last reference
350 // to destruct this object. In that case, safe to destroy the channel. 419 // to destruct this object. In that case, safe to destroy the channel.
351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) 420 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
352 channel_->Shutdown(); 421 channel_->Shutdown();
353 else 422 else
354 DCHECK(!channel_); 423 DCHECK(!channel_);
355 #if defined(OS_POSIX) 424 #if defined(OS_POSIX)
356 ClosePlatformHandles(&serialized_fds_); 425 ClosePlatformHandles(&serialized_fds_);
357 #endif 426 #endif
358 } 427 }
359 428
360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { 429 void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
361 lock().AssertAcquired(); 430 lock().AssertAcquired();
362 awakable_list_.CancelAll(); 431 awakable_list_.CancelAll();
363 } 432 }
364 433
365 void MessagePipeDispatcher::CloseImplNoLock() { 434 void MessagePipeDispatcher::CloseImplNoLock() {
366 lock().AssertAcquired(); 435 lock().AssertAcquired();
367 internal::g_io_thread_task_runner->PostTask( 436 internal::g_io_thread_task_runner->PostTask(
368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); 437 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
369 } 438 }
370 439
371 void MessagePipeDispatcher::SerializeInternal() { 440 void MessagePipeDispatcher::SerializeInternal() {
441 serialized_ = true;
442 if (!transferable_) {
443 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
444 << "Non transferable message pipe being sent after read/write. "
445 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
446 << "the pipe can be sent after it's read or written.";
447 non_transferable_state_ = SERIALISED;
448 return;
449 }
450
372 // We need to stop watching handle immediately, even though not on IO thread, 451 // We need to stop watching handle immediately, even though not on IO thread,
373 // so that other messages aren't read after this. 452 // so that other messages aren't read after this.
374 std::vector<int> serialized_read_fds, serialized_write_fds; 453 std::vector<int> serialized_read_fds, serialized_write_fds;
375 if (channel_) { 454 if (channel_) {
376 bool write_error = false; 455 bool write_error = false;
377 456
378 serialized_platform_handle_ = channel_->ReleaseHandle( 457 serialized_platform_handle_ = channel_->ReleaseHandle(
379 &serialized_read_buffer_, &serialized_write_buffer_, 458 &serialized_read_buffer_, &serialized_write_buffer_,
380 &serialized_read_fds, &serialized_write_fds, &write_error); 459 &serialized_read_fds, &serialized_write_fds, &write_error);
381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), 460 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(),
382 serialized_read_fds.end()); 461 serialized_read_fds.end());
383 serialized_read_fds_length_ = serialized_read_fds.size(); 462 serialized_read_fds_length_ = serialized_read_fds.size();
384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), 463 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(),
385 serialized_write_fds.end()); 464 serialized_write_fds.end());
386 serialized_write_fds_length_ = serialized_write_fds.size(); 465 serialized_write_fds_length_ = serialized_write_fds.size();
387 channel_ = nullptr; 466 channel_ = nullptr;
388 if (write_error)
389 write_error = true;
390 } else { 467 } else {
391 // It's valid that the other side wrote some data and closed its end. 468 // It's valid that the other side wrote some data and closed its end.
392 } 469 }
393 470
394 DCHECK(serialized_message_queue_.empty()); 471 DCHECK(serialized_message_queue_.empty());
395 while (!message_queue_.IsEmpty()) { 472 while (!message_queue_.IsEmpty()) {
396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); 473 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
397 474
398 // When MojoWriteMessage is called, the MessageInTransit doesn't have 475 // When MojoWriteMessage is called, the MessageInTransit doesn't have
399 // dispatchers set and CreateEquivaent... is called since the dispatchers 476 // dispatchers set and CreateEquivaent... is called since the dispatchers
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
434 &all_platform_handles->at(0), all_platform_handles->size(), tokens); 511 &all_platform_handles->at(0), all_platform_handles->size(), tokens);
435 for (size_t i = 0; i < all_platform_handles->size(); i++) 512 for (size_t i = 0; i < all_platform_handles->size(); i++)
436 all_platform_handles->at(i) = PlatformHandle(); 513 all_platform_handles->at(i) = PlatformHandle();
437 #else 514 #else
438 for (size_t i = 0; i < all_platform_handles->size(); i++) { 515 for (size_t i = 0; i < all_platform_handles->size(); i++) {
439 serialized_fds_.push_back(all_platform_handles->at(i).fd); 516 serialized_fds_.push_back(all_platform_handles->at(i).fd);
440 serialized_message_fds_length_++; 517 serialized_message_fds_length_++;
441 all_platform_handles->at(i) = PlatformHandle(); 518 all_platform_handles->at(i) = PlatformHandle();
442 } 519 }
443 #endif 520 #endif
521 }
444 522
445 serialized_message_queue_.insert( 523 serialized_message_queue_.insert(
446 serialized_message_queue_.end(), 524 serialized_message_queue_.end(),
447 static_cast<const char*>(message->transport_data()->buffer()), 525 static_cast<const char*>(message->transport_data()->buffer()),
448 static_cast<const char*>(message->transport_data()->buffer()) + 526 static_cast<const char*>(message->transport_data()->buffer()) +
449 transport_data_buffer_size); 527 transport_data_buffer_size);
450 }
451 } 528 }
452 529
453 for (size_t i = 0; i < dispatchers.size(); ++i) 530 for (size_t i = 0; i < dispatchers.size(); ++i)
454 dispatchers[i]->TransportEnded(); 531 dispatchers[i]->TransportEnded();
455 } 532 }
456
457 serialized_ = true;
458 } 533 }
459 534
460 scoped_refptr<Dispatcher> 535 scoped_refptr<Dispatcher>
461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 536 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
462 lock().AssertAcquired(); 537 lock().AssertAcquired();
463 538
464 SerializeInternal(); 539 SerializeInternal();
465 540
466 // TODO(vtl): Currently, there are no options, so we just use 541 scoped_refptr<MessagePipeDispatcher> rv(
467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options 542 new MessagePipeDispatcher(transferable_));
468 // too.
469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
471 serialized_message_queue_.swap(rv->serialized_message_queue_);
472 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
473 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
474 serialized_fds_.swap(rv->serialized_fds_);
475 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
476 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
477 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
478 rv->serialized_ = true; 543 rv->serialized_ = true;
479 rv->write_error_ = write_error_; 544 if (transferable_) {
480 return scoped_refptr<Dispatcher>(rv.get()); 545 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
546 serialized_message_queue_.swap(rv->serialized_message_queue_);
547 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
548 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
549 serialized_fds_.swap(rv->serialized_fds_);
550 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
551 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
552 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
553 rv->write_error_ = write_error_;
554 } else {
555 rv->pipe_id_ = pipe_id_;
556 rv->non_transferable_state_ = non_transferable_state_;
557 }
558 return rv;
481 } 559 }
482 560
483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( 561 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
484 const void* bytes, 562 const void* bytes,
485 uint32_t num_bytes, 563 uint32_t num_bytes,
486 std::vector<DispatcherTransport>* transports, 564 std::vector<DispatcherTransport>* transports,
487 MojoWriteMessageFlags flags) { 565 MojoWriteMessageFlags flags) {
566 lock().AssertAcquired();
488 567
489 DCHECK(!transports || 568 DCHECK(!transports ||
490 (transports->size() > 0 && 569 (transports->size() > 0 &&
491 transports->size() <= GetConfiguration().max_message_num_handles)); 570 transports->size() <= GetConfiguration().max_message_num_handles));
492 571
493 lock().AssertAcquired(); 572 if (write_error_ ||
494 573 (transferable_ && !channel_) ||
495 if (!channel_ || write_error_) 574 (!transferable_ && non_transferable_state_ == CLOSED)) {
496 return MOJO_RESULT_FAILED_PRECONDITION; 575 return MOJO_RESULT_FAILED_PRECONDITION;
576 }
497 577
498 if (num_bytes > GetConfiguration().max_message_num_bytes) 578 if (num_bytes > GetConfiguration().max_message_num_bytes)
499 return MOJO_RESULT_RESOURCE_EXHAUSTED; 579 return MOJO_RESULT_RESOURCE_EXHAUSTED;
500 scoped_ptr<MessageInTransit> message(new MessageInTransit( 580 scoped_ptr<MessageInTransit> message(new MessageInTransit(
501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); 581 MessageInTransit::Type::MESSAGE, num_bytes, bytes));
502 if (transports) { 582 if (transports) {
503 MojoResult result = AttachTransportsNoLock(message.get(), transports); 583 MojoResult result = AttachTransportsNoLock(message.get(), transports);
504 if (result != MOJO_RESULT_OK) 584 if (result != MOJO_RESULT_OK)
505 return result; 585 return result;
506 } 586 }
507 587
508 message->SerializeAndCloseDispatchers(); 588 message->SerializeAndCloseDispatchers();
509 channel_->WriteMessage(message.Pass()); 589 if (!transferable_)
590 message->set_route_id(pipe_id_);
591 if (!transferable_ &&
592 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
593 non_transferable_state_ == CONNECT_CALLED)) {
594 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
595 RequestNontransferableChannel();
596 non_transferable_outgoing_message_queue_.AddMessage(message.Pass());
597 } else {
598 channel_->WriteMessage(message.Pass());
599 }
510 600
511 return MOJO_RESULT_OK; 601 return MOJO_RESULT_OK;
512 } 602 }
513 603
514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( 604 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
515 void* bytes, 605 void* bytes,
516 uint32_t* num_bytes, 606 uint32_t* num_bytes,
517 DispatcherVector* dispatchers, 607 DispatcherVector* dispatchers,
518 uint32_t* num_dispatchers, 608 uint32_t* num_dispatchers,
519 MojoReadMessageFlags flags) { 609 MojoReadMessageFlags flags) {
520 lock().AssertAcquired(); 610 lock().AssertAcquired();
521 if (channel_) 611 if (channel_) {
522 channel_->EnsureLazyInitialized(); 612 channel_->EnsureLazyInitialized();
613 } else if (!transferable_) {
614 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
615 RequestNontransferableChannel();
616 return MOJO_RESULT_SHOULD_WAIT;
617 } else if (non_transferable_state_ == CONNECT_CALLED) {
618 return MOJO_RESULT_SHOULD_WAIT;
619 }
620 }
621
523 DCHECK(!dispatchers || dispatchers->empty()); 622 DCHECK(!dispatchers || dispatchers->empty());
524 623
525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; 624 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; 625 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
527 626
528 if (message_queue_.IsEmpty()) 627 if (message_queue_.IsEmpty())
529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; 628 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION;
530 629
531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop 630 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
532 // and release the lock immediately. 631 // and release the lock immediately.
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 return MOJO_RESULT_OK; 675 return MOJO_RESULT_OK;
577 } 676 }
578 677
579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() 678 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
580 const { 679 const {
581 lock().AssertAcquired(); 680 lock().AssertAcquired();
582 681
583 HandleSignalsState rv; 682 HandleSignalsState rv;
584 if (!message_queue_.IsEmpty()) 683 if (!message_queue_.IsEmpty())
585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 684 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
586 if (channel_ || !message_queue_.IsEmpty()) 685 if (!message_queue_.IsEmpty() ||
686 (transferable_ && channel_) ||
687 (!transferable_ && non_transferable_state_ != CLOSED))
587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 688 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
588 if (channel_ && !write_error_) { 689 if (!write_error_ &&
690 ((transferable_ && channel_) ||
691 (!transferable_ && non_transferable_state_ != CLOSED))) {
589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 692 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 693 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
591 } 694 }
592 if (!channel_ || write_error_) 695 if (write_error_ ||
696 (transferable_ && !channel_) ||
697 (!transferable_ &&
698 ((non_transferable_state_ == CLOSED) || is_closed()))) {
593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 699 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
700 }
594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 701 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
595 return rv; 702 return rv;
596 } 703 }
597 704
598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( 705 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
599 Awakable* awakable, 706 Awakable* awakable,
600 MojoHandleSignals signals, 707 MojoHandleSignals signals,
601 uintptr_t context, 708 uintptr_t context,
602 HandleSignalsState* signals_state) { 709 HandleSignalsState* signals_state) {
603 lock().AssertAcquired(); 710 lock().AssertAcquired();
604 if (channel_) 711 if (channel_) {
605 channel_->EnsureLazyInitialized(); 712 channel_->EnsureLazyInitialized();
713 } else if (!transferable_ &&
714 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
715 RequestNontransferableChannel();
716 }
717
606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 718 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
607 if (state.satisfies(signals)) { 719 if (state.satisfies(signals)) {
608 if (signals_state) 720 if (signals_state)
609 *signals_state = state; 721 *signals_state = state;
610 return MOJO_RESULT_ALREADY_EXISTS; 722 return MOJO_RESULT_ALREADY_EXISTS;
611 } 723 }
612 if (!state.can_satisfy(signals)) { 724 if (!state.can_satisfy(signals)) {
613 if (signals_state) 725 if (signals_state)
614 *signals_state = state; 726 *signals_state = state;
615 return MOJO_RESULT_FAILED_PRECONDITION; 727 return MOJO_RESULT_FAILED_PRECONDITION;
(...skipping 30 matching lines...) Expand all
646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); 758 *max_size = sizeof(SerializedMessagePipeHandleDispatcher);
647 } 759 }
648 760
649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( 761 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
650 void* destination, 762 void* destination,
651 size_t* actual_size, 763 size_t* actual_size,
652 PlatformHandleVector* platform_handles) { 764 PlatformHandleVector* platform_handles) {
653 CloseImplNoLock(); 765 CloseImplNoLock();
654 SerializedMessagePipeHandleDispatcher* serialization = 766 SerializedMessagePipeHandleDispatcher* serialization =
655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); 767 static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
768 serialization->transferable = transferable_;
769 serialization->pipe_id = pipe_id_;
656 if (serialized_platform_handle_.is_valid()) { 770 if (serialized_platform_handle_.is_valid()) {
657 serialization->platform_handle_index = platform_handles->size(); 771 serialization->platform_handle_index = platform_handles->size();
658 platform_handles->push_back(serialized_platform_handle_.release()); 772 platform_handles->push_back(serialized_platform_handle_.release());
659 } else { 773 } else {
660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; 774 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
661 } 775 }
662 776
663 serialization->write_error = write_error_; 777 serialization->write_error = write_error_;
664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); 778 serialization->serialized_read_buffer_size = serialized_read_buffer_.size();
665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); 779 serialization->serialized_write_buffer_size = serialized_write_buffer_.size();
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
789 break; 903 break;
790 } 904 }
791 905
792 if (started_transport_.Try()) { 906 if (started_transport_.Try()) {
793 base::AutoLock locker(lock()); 907 base::AutoLock locker(lock());
794 // We can get two OnError callbacks before the post task below completes. 908 // We can get two OnError callbacks before the post task below completes.
795 // Although RawChannel still has a pointer to this object until Shutdown is 909 // Although RawChannel still has a pointer to this object until Shutdown is
796 // called, that is safe since this class always does a PostTask to the IO 910 // called, that is safe since this class always does a PostTask to the IO
797 // thread to self destruct. 911 // thread to self destruct.
798 if (channel_ && error != ERROR_WRITE) { 912 if (channel_ && error != ERROR_WRITE) {
799 channel_->Shutdown(); 913 if (transferable_) {
914 channel_->Shutdown();
915 } else {
916 CHECK_NE(non_transferable_state_, CLOSED);
917 // Since we're in a callback from the Broker, call it asynchronously.
918 internal::g_io_thread_task_runner->PostTask(
919 FROM_HERE,
920 base::Bind(&Broker::CloseMessagePipe,
921 base::Unretained(internal::g_broker), pipe_id_,
922 base::Unretained(this)));
923 non_transferable_state_ = CLOSED;
924 }
800 channel_ = nullptr; 925 channel_ = nullptr;
801 } 926 }
802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 927 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
803 started_transport_.Release(); 928 started_transport_.Release();
804 } else { 929 } else {
805 // We must be waiting to call ReleaseHandle. It will call Shutdown. 930 // We must be waiting to call ReleaseHandle. It will call Shutdown.
806 } 931 }
807 } 932 }
808 933
809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( 934 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
810 MessageInTransit* message, 935 MessageInTransit* message,
811 std::vector<DispatcherTransport>* transports) { 936 std::vector<DispatcherTransport>* transports) {
812 DCHECK(!message->has_dispatchers()); 937 DCHECK(!message->has_dispatchers());
813 938
814 // You're not allowed to send either handle to a message pipe over the message 939 // You're not allowed to send either handle to a message pipe over the message
815 // pipe, so check for this. (The case of trying to write a handle to itself is 940 // pipe, so check for this. (The case of trying to write a handle to itself is
816 // taken care of by |Core|. That case kind of makes sense, but leads to 941 // taken care of by |Core|. That case kind of makes sense, but leads to
817 // complications if, e.g., both sides try to do the same thing with their 942 // complications if, e.g., both sides try to do the same thing with their
818 // respective handles simultaneously. The other case, of trying to write the 943 // respective handles simultaneously. The other case, of trying to write the
819 // peer handle to a handle, doesn't make sense -- since no handle will be 944 // peer handle to a handle, doesn't make sense -- since no handle will be
820 // available to read the message from.) 945 // available to read the message from.)
821 for (size_t i = 0; i < transports->size(); i++) { 946 for (size_t i = 0; i < transports->size(); i++) {
822 if (!(*transports)[i].is_valid()) 947 if (!(*transports)[i].is_valid())
823 continue; 948 continue;
824 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { 949 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
825 MessagePipeDispatcher* mp = 950 MessagePipeDispatcher* mp =
826 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); 951 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
827 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { 952 if (transferable_ && mp->transferable_ &&
953 channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
828 // The other case should have been disallowed by |Core|. (Note: |port| 954 // The other case should have been disallowed by |Core|. (Note: |port|
829 // is the peer port of the handle given to |WriteMessage()|.) 955 // is the peer port of the handle given to |WriteMessage()|.)
830 return MOJO_RESULT_INVALID_ARGUMENT; 956 return MOJO_RESULT_INVALID_ARGUMENT;
957 } else if (!transferable_ && !mp->transferable_ &&
958 pipe_id_ == mp->pipe_id_) {
959 return MOJO_RESULT_INVALID_ARGUMENT;
831 } 960 }
832 } 961 }
833 } 962 }
834 963
835 // Clone the dispatchers and attach them to the message. (This must be done as 964 // Clone the dispatchers and attach them to the message. (This must be done as
836 // a separate loop, since we want to leave the dispatchers alone on failure.) 965 // a separate loop, since we want to leave the dispatchers alone on failure.)
837 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); 966 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
838 dispatchers->reserve(transports->size()); 967 dispatchers->reserve(transports->size());
839 for (size_t i = 0; i < transports->size(); i++) { 968 for (size_t i = 0; i < transports->size(); i++) {
840 if ((*transports)[i].is_valid()) { 969 if ((*transports)[i].is_valid()) {
841 dispatchers->push_back( 970 dispatchers->push_back(
842 (*transports)[i].CreateEquivalentDispatcherAndClose()); 971 (*transports)[i].CreateEquivalentDispatcherAndClose());
843 } else { 972 } else {
844 LOG(WARNING) << "Enqueueing null dispatcher"; 973 LOG(WARNING) << "Enqueueing null dispatcher";
845 dispatchers->push_back(nullptr); 974 dispatchers->push_back(nullptr);
846 } 975 }
847 } 976 }
848 message->SetDispatchers(dispatchers.Pass()); 977 message->SetDispatchers(dispatchers.Pass());
849 return MOJO_RESULT_OK; 978 return MOJO_RESULT_OK;
850 } 979 }
851 980
981 void MessagePipeDispatcher::RequestNontransferableChannel() {
982 lock().AssertAcquired();
983 CHECK(!transferable_);
984 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
985 non_transferable_state_ = CONNECT_CALLED;
986
987 // PostTask since the broker can call us back synchronously.
988 internal::g_io_thread_task_runner->PostTask(
989 FROM_HERE,
990 base::Bind(&Broker::ConnectMessagePipe,
991 base::Unretained(internal::g_broker), pipe_id_,
992 base::Unretained(this)));
993 }
994
852 } // namespace edk 995 } // namespace edk
853 } // namespace mojo 996 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/message_pipe_perftest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698