Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h" 9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h" 10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/embedder/platform_handle_utils.h" 11 #include "mojo/edk/embedder/platform_handle_utils.h"
12 #include "mojo/edk/embedder/platform_shared_buffer.h" 12 #include "mojo/edk/embedder/platform_shared_buffer.h"
13 #include "mojo/edk/embedder/platform_support.h" 13 #include "mojo/edk/embedder/platform_support.h"
14 #include "mojo/edk/system/broker.h" 14 #include "mojo/edk/system/broker.h"
15 #include "mojo/edk/system/configuration.h" 15 #include "mojo/edk/system/configuration.h"
16 #include "mojo/edk/system/message_in_transit.h" 16 #include "mojo/edk/system/message_in_transit.h"
17 #include "mojo/edk/system/options_validation.h" 17 #include "mojo/edk/system/options_validation.h"
18 #include "mojo/edk/system/transport_data.h" 18 #include "mojo/edk/system/transport_data.h"
19 19
20 namespace mojo { 20 namespace mojo {
21 namespace edk { 21 namespace edk {
22 22
23 namespace { 23 namespace {
24 24
25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
26 26
27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
28 bool transferable;
29 bool write_error;
30 uint64_t pipe_id; // If transferable is false.
31 // The following members are only set if transferable is true.
28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
29 // was closed. 33 // was closed.
30 size_t platform_handle_index; 34 size_t platform_handle_index;
31 bool write_error;
32 35
33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
34 uint32_t shared_memory_size; 37 uint32_t shared_memory_size;
35 38
36 size_t serialized_read_buffer_size; 39 size_t serialized_read_buffer_size;
37 size_t serialized_write_buffer_size; 40 size_t serialized_write_buffer_size;
38 size_t serialized_message_queue_size; 41 size_t serialized_message_queue_size;
39 42
40 // These are the FDs required as part of serializing channel_ and 43 // These are the FDs required as part of serializing channel_ and
41 // message_queue_. This is only used on POSIX. 44 // message_queue_. This is only used on POSIX.
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 86
84 const MojoCreateMessagePipeOptions 87 const MojoCreateMessagePipeOptions
85 MessagePipeDispatcher::kDefaultCreateOptions = { 88 MessagePipeDispatcher::kDefaultCreateOptions = {
86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
88 91
89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
90 const MojoCreateMessagePipeOptions* in_options, 93 const MojoCreateMessagePipeOptions* in_options,
91 MojoCreateMessagePipeOptions* out_options) { 94 MojoCreateMessagePipeOptions* out_options) {
92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE |
97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
94 98
95 *out_options = kDefaultCreateOptions; 99 *out_options = kDefaultCreateOptions;
96 if (!in_options) 100 if (!in_options)
97 return MOJO_RESULT_OK; 101 return MOJO_RESULT_OK;
98 102
99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
100 if (!reader.is_valid()) 104 if (!reader.is_valid())
101 return MOJO_RESULT_INVALID_ARGUMENT; 105 return MOJO_RESULT_INVALID_ARGUMENT;
102 106
103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
104 return MOJO_RESULT_OK; 108 return MOJO_RESULT_OK;
105 if ((reader.options().flags & ~kKnownFlags)) 109 if ((reader.options().flags & ~kKnownFlags))
106 return MOJO_RESULT_UNIMPLEMENTED; 110 return MOJO_RESULT_UNIMPLEMENTED;
107 out_options->flags = reader.options().flags; 111 out_options->flags = reader.options().flags;
108 112
109 // Checks for fields beyond |flags|: 113 // Checks for fields beyond |flags|:
110 114
111 // (Nothing here yet.) 115 // (Nothing here yet.)
112 116
113 return MOJO_RESULT_OK; 117 return MOJO_RESULT_OK;
114 } 118 }
115 119
116 void MessagePipeDispatcher::Init( 120 void MessagePipeDispatcher::Init(
117 ScopedPlatformHandle message_pipe, 121 ScopedPlatformHandle message_pipe,
118 char* serialized_read_buffer, size_t serialized_read_buffer_size, 122 char* serialized_read_buffer, size_t serialized_read_buffer_size,
119 char* serialized_write_buffer, size_t serialized_write_buffer_size, 123 char* serialized_write_buffer, size_t serialized_write_buffer_size,
120 std::vector<int>* serialized_read_fds, 124 std::vector<int>* serialized_read_fds,
121 std::vector<int>* serialized_write_fds) { 125 std::vector<int>* serialized_write_fds) {
126 CHECK(transferable_);
122 if (message_pipe.get().is_valid()) { 127 if (message_pipe.get().is_valid()) {
123 channel_ = RawChannel::Create(message_pipe.Pass()); 128 channel_ = RawChannel::Create(message_pipe.Pass());
124 129
125 // TODO(jam): It's probably cleaner to pass this in Init call. 130 // TODO(jam): It's probably cleaner to pass this in Init call.
126 channel_->SetSerializedData( 131 channel_->SetSerializedData(
127 serialized_read_buffer, serialized_read_buffer_size, 132 serialized_read_buffer, serialized_read_buffer_size,
128 serialized_write_buffer, serialized_write_buffer_size, 133 serialized_write_buffer, serialized_write_buffer_size,
129 serialized_read_fds, serialized_write_fds); 134 serialized_read_fds, serialized_write_fds);
130 internal::g_io_thread_task_runner->PostTask( 135 internal::g_io_thread_task_runner->PostTask(
131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
132 } 137 }
133 } 138 }
134 139
140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
141 CHECK(!transferable_);
142 pipe_id_ = pipe_id;
143 }
144
135 void MessagePipeDispatcher::InitOnIO() { 145 void MessagePipeDispatcher::InitOnIO() {
136 base::AutoLock locker(lock()); 146 base::AutoLock locker(lock());
147 CHECK(transferable_);
137 calling_init_ = true; 148 calling_init_ = true;
138 if (channel_) 149 if (channel_)
139 channel_->Init(this); 150 channel_->Init(this);
140 calling_init_ = false; 151 calling_init_ = false;
141 } 152 }
142 153
143 void MessagePipeDispatcher::CloseOnIO() { 154 void MessagePipeDispatcher::CloseOnIO() {
144 base::AutoLock locker(lock()); 155 base::AutoLock locker(lock());
145 156 if (transferable_) {
146 if (channel_) { 157 if (channel_) {
147 channel_->Shutdown(); 158 channel_->Shutdown();
148 channel_ = nullptr; 159 channel_ = nullptr;
160 }
161 } else {
162 if (non_transferable_state_ == CONNECT_CALLED) {
163 // We can't cancel the pending request yet, since the other side of the
164 // message pipe would want to get pending outgoing messages (if any) or
165 // at least know that this end was closed. So keep this object alive until
166 // then.
167 AddRef();
168 } else if (non_transferable_state_ == CONNECTED) {
169 internal::g_broker->CloseMessagePipe(pipe_id_, this);
170 non_transferable_state_ = CLOSED;
171 channel_ = nullptr;
172 }
149 } 173 }
150 } 174 }
151 175
152 Dispatcher::Type MessagePipeDispatcher::GetType() const { 176 Dispatcher::Type MessagePipeDispatcher::GetType() const {
153 return Type::MESSAGE_PIPE; 177 return Type::MESSAGE_PIPE;
154 } 178 }
155 179
180 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
181 base::AutoLock locker(lock());
182 channel_ = channel;
183 while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
184 channel_->WriteMessage(
185 non_transferable_outgoing_message_queue_.GetMessage());
186 }
187
188 if (is_closed()) {
189 CHECK_EQ(non_transferable_state_, CONNECT_CALLED);
190 // We kept this object alive until it's connected, we can release it now.
191 // Since we're in a callback from the Broker, call it asynchronously.
192 internal::g_io_thread_task_runner->PostTask(
193 FROM_HERE,
194 base::Bind(&Broker::CloseMessagePipe,
195 base::Unretained(internal::g_broker), pipe_id_,
196 base::Unretained(this)));
197 non_transferable_state_ = CLOSED;
198 channel_ = nullptr;
199 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
200 } else {
201 non_transferable_state_ = CONNECTED;
202 }
203 }
204
156 #if defined(OS_WIN) 205 #if defined(OS_WIN)
157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the 206 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
158 // best way we want to share this. 207 // best way we want to share this.
159 // Since this is used for serialization of messages read/written to a MP that 208 // Since this is used for serialization of messages read/written to a MP that
160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of 209 // aren't consumed by Mojo primitives yet, there could be an unbounded number of
161 // them when a MP is being sent. As a result, even for POSIX we will probably 210 // them when a MP is being sent. As a result, even for POSIX we will probably
162 // want to send the handles to the shell process and exchange them for tokens 211 // want to send the handles to the shell process and exchange them for tokens
163 // (since we can be sure that the shell will respond to our IPCs, compared to 212 // (since we can be sure that the shell will respond to our IPCs, compared to
164 // the other end where we're sending the MP to, which may not be reading...). 213 // the other end where we're sending the MP to, which may not be reading...).
165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( 214 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
(...skipping 13 matching lines...) Expand all
179 const void* source, 228 const void* source,
180 size_t size, 229 size_t size,
181 PlatformHandleVector* platform_handles) { 230 PlatformHandleVector* platform_handles) {
182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { 231 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) {
183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; 232 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)";
184 return nullptr; 233 return nullptr;
185 } 234 }
186 235
187 const SerializedMessagePipeHandleDispatcher* serialization = 236 const SerializedMessagePipeHandleDispatcher* serialization =
188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); 237 static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
238
239 scoped_refptr<MessagePipeDispatcher> rv(
240 new MessagePipeDispatcher(serialization->transferable));
241 if (!rv->transferable_) {
242 rv->InitNonTransferable(serialization->pipe_id);
243 return rv;
244 }
245
189 if (serialization->shared_memory_size != 246 if (serialization->shared_memory_size !=
190 (serialization->serialized_read_buffer_size + 247 (serialization->serialized_read_buffer_size +
191 serialization->serialized_write_buffer_size + 248 serialization->serialized_write_buffer_size +
192 serialization->serialized_message_queue_size)) { 249 serialization->serialized_message_queue_size)) {
193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; 250 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)";
194 return nullptr; 251 return nullptr;
195 } 252 }
196 253
197 ScopedPlatformHandle platform_handle, shared_memory_handle; 254 ScopedPlatformHandle platform_handle, shared_memory_handle;
198 if (!GetHandle(serialization->platform_handle_index, 255 if (!GetHandle(serialization->platform_handle_index,
(...skipping 27 matching lines...) Expand all
226 serialization->serialized_write_buffer_size; 283 serialization->serialized_write_buffer_size;
227 buffer += serialized_write_buffer_size; 284 buffer += serialized_write_buffer_size;
228 } 285 }
229 if (serialization->serialized_message_queue_size) { 286 if (serialization->serialized_message_queue_size) {
230 message_queue_data = buffer; 287 message_queue_data = buffer;
231 message_queue_size = serialization->serialized_message_queue_size; 288 message_queue_size = serialization->serialized_message_queue_size;
232 buffer += message_queue_size; 289 buffer += message_queue_size;
233 } 290 }
234 } 291 }
235 292
236 scoped_refptr<MessagePipeDispatcher> rv(
237 Create(MessagePipeDispatcher::kDefaultCreateOptions));
238 rv->write_error_ = serialization->write_error; 293 rv->write_error_ = serialization->write_error;
239 294
240 std::vector<int> serialized_read_fds; 295 std::vector<int> serialized_read_fds;
241 std::vector<int> serialized_write_fds; 296 std::vector<int> serialized_write_fds;
242 #if defined(OS_POSIX) 297 #if defined(OS_POSIX)
243 std::vector<int> serialized_fds; 298 std::vector<int> serialized_fds;
244 size_t serialized_fds_index = 0; 299 size_t serialized_fds_index = 0;
245 300
246 size_t total_fd_count = serialization->serialized_read_fds_length + 301 size_t total_fd_count = serialization->serialized_read_fds_length +
247 serialization->serialized_write_fds_length + 302 serialization->serialized_write_fds_length +
(...skipping 14 matching lines...) Expand all
262 serialized_fds_index += serialization->serialized_read_fds_length; 317 serialized_fds_index += serialization->serialized_read_fds_length;
263 serialized_write_fds.assign( 318 serialized_write_fds.assign(
264 serialized_fds.begin() + serialized_fds_index, 319 serialized_fds.begin() + serialized_fds_index,
265 serialized_fds.begin() + serialized_fds_index + 320 serialized_fds.begin() + serialized_fds_index +
266 serialization->serialized_write_fds_length); 321 serialization->serialized_write_fds_length);
267 serialized_fds_index += serialization->serialized_write_fds_length; 322 serialized_fds_index += serialization->serialized_write_fds_length;
268 #endif 323 #endif
269 324
270 while (message_queue_size) { 325 while (message_queue_size) {
271 size_t message_size; 326 size_t message_size;
272 CHECK(MessageInTransit::GetNextMessageSize( 327 if (!MessageInTransit::GetNextMessageSize(
273 message_queue_data, message_queue_size, &message_size)); 328 message_queue_data, message_queue_size, &message_size)) {
329 NOTREACHED() << "Couldn't read message size from serialized data.";
330 return nullptr;
331 }
332 if (message_size > message_queue_size) {
333 NOTREACHED() << "Invalid serialized message size.";
334 return nullptr;
335 }
274 MessageInTransit::View message_view(message_size, message_queue_data); 336 MessageInTransit::View message_view(message_size, message_queue_data);
275 message_queue_size -= message_size; 337 message_queue_size -= message_size;
276 message_queue_data += message_size; 338 message_queue_data += message_size;
277 339
278 // TODO(jam): Copied below from RawChannelWin. See commment above 340 // TODO(jam): Copied below from RawChannelWin. See commment above
279 // GetReadPlatformHandles. 341 // GetReadPlatformHandles.
280 ScopedPlatformHandleVectorPtr temp_platform_handles; 342 ScopedPlatformHandleVectorPtr temp_platform_handles;
281 if (message_view.transport_data_buffer()) { 343 if (message_view.transport_data_buffer()) {
282 size_t num_platform_handles; 344 size_t num_platform_handles;
283 const void* platform_handle_table; 345 const void* platform_handle_table;
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 &serialized_write_fds); 388 &serialized_write_fds);
327 389
328 if (message_queue_size) { // Should be empty by now. 390 if (message_queue_size) { // Should be empty by now.
329 LOG(ERROR) << "Invalid queued messages"; 391 LOG(ERROR) << "Invalid queued messages";
330 return nullptr; 392 return nullptr;
331 } 393 }
332 394
333 return rv; 395 return rv;
334 } 396 }
335 397
336 MessagePipeDispatcher::MessagePipeDispatcher() 398 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
337 : channel_(nullptr), 399 : channel_(nullptr),
338 serialized_(false),
339 serialized_read_fds_length_(0u), 400 serialized_read_fds_length_(0u),
340 serialized_write_fds_length_(0u), 401 serialized_write_fds_length_(0u),
341 serialized_message_fds_length_(0u), 402 serialized_message_fds_length_(0u),
403 pipe_id_(0),
404 non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
405 serialized_(false),
342 calling_init_(false), 406 calling_init_(false),
343 write_error_(false) { 407 write_error_(false),
408 transferable_(transferable) {
344 } 409 }
345 410
346 MessagePipeDispatcher::~MessagePipeDispatcher() { 411 MessagePipeDispatcher::~MessagePipeDispatcher() {
347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The 412 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
348 // exception is if they posted a task to run CloseOnIO but the IO thread shut 413 // exception is if they posted a task to run CloseOnIO but the IO thread shut
349 // down and so when it was deleting pending tasks it caused the last reference 414 // down and so when it was deleting pending tasks it caused the last reference
350 // to destruct this object. In that case, safe to destroy the channel. 415 // to destruct this object. In that case, safe to destroy the channel.
351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) 416 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
352 channel_->Shutdown(); 417 channel_->Shutdown();
353 else 418 else
354 DCHECK(!channel_); 419 DCHECK(!channel_);
355 #if defined(OS_POSIX) 420 #if defined(OS_POSIX)
356 ClosePlatformHandles(&serialized_fds_); 421 ClosePlatformHandles(&serialized_fds_);
357 #endif 422 #endif
358 } 423 }
359 424
360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { 425 void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
361 lock().AssertAcquired(); 426 lock().AssertAcquired();
362 awakable_list_.CancelAll(); 427 awakable_list_.CancelAll();
363 } 428 }
364 429
365 void MessagePipeDispatcher::CloseImplNoLock() { 430 void MessagePipeDispatcher::CloseImplNoLock() {
366 lock().AssertAcquired(); 431 lock().AssertAcquired();
367 internal::g_io_thread_task_runner->PostTask( 432 internal::g_io_thread_task_runner->PostTask(
368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); 433 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
369 } 434 }
370 435
371 void MessagePipeDispatcher::SerializeInternal() { 436 void MessagePipeDispatcher::SerializeInternal() {
437 serialized_ = true;
438 if (!transferable_) {
439 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
440 << "Non transferable message pipe being sent after read/write. "
441 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
442 << "the pipe can be sent after it's read or written.";
443 return;
444 }
445
372 // We need to stop watching handle immediately, even though not on IO thread, 446 // We need to stop watching handle immediately, even though not on IO thread,
373 // so that other messages aren't read after this. 447 // so that other messages aren't read after this.
374 std::vector<int> serialized_read_fds, serialized_write_fds; 448 std::vector<int> serialized_read_fds, serialized_write_fds;
375 if (channel_) { 449 if (channel_) {
376 bool write_error = false; 450 bool write_error = false;
377 451
378 serialized_platform_handle_ = channel_->ReleaseHandle( 452 serialized_platform_handle_ = channel_->ReleaseHandle(
379 &serialized_read_buffer_, &serialized_write_buffer_, 453 &serialized_read_buffer_, &serialized_write_buffer_,
380 &serialized_read_fds, &serialized_write_fds, &write_error); 454 &serialized_read_fds, &serialized_write_fds, &write_error);
381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), 455 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(),
382 serialized_read_fds.end()); 456 serialized_read_fds.end());
383 serialized_read_fds_length_ = serialized_read_fds.size(); 457 serialized_read_fds_length_ = serialized_read_fds.size();
384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), 458 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(),
385 serialized_write_fds.end()); 459 serialized_write_fds.end());
386 serialized_write_fds_length_ = serialized_write_fds.size(); 460 serialized_write_fds_length_ = serialized_write_fds.size();
387 channel_ = nullptr; 461 channel_ = nullptr;
388 if (write_error)
389 write_error = true;
390 } else { 462 } else {
391 // It's valid that the other side wrote some data and closed its end. 463 // It's valid that the other side wrote some data and closed its end.
392 } 464 }
393 465
394 DCHECK(serialized_message_queue_.empty()); 466 DCHECK(serialized_message_queue_.empty());
395 while (!message_queue_.IsEmpty()) { 467 while (!message_queue_.IsEmpty()) {
396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); 468 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
397 469
398 // When MojoWriteMessage is called, the MessageInTransit doesn't have 470 // When MojoWriteMessage is called, the MessageInTransit doesn't have
399 // dispatchers set and CreateEquivaent... is called since the dispatchers 471 // dispatchers set and CreateEquivaent... is called since the dispatchers
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
434 &all_platform_handles->at(0), all_platform_handles->size(), tokens); 506 &all_platform_handles->at(0), all_platform_handles->size(), tokens);
435 for (size_t i = 0; i < all_platform_handles->size(); i++) 507 for (size_t i = 0; i < all_platform_handles->size(); i++)
436 all_platform_handles->at(i) = PlatformHandle(); 508 all_platform_handles->at(i) = PlatformHandle();
437 #else 509 #else
438 for (size_t i = 0; i < all_platform_handles->size(); i++) { 510 for (size_t i = 0; i < all_platform_handles->size(); i++) {
439 serialized_fds_.push_back(all_platform_handles->at(i).fd); 511 serialized_fds_.push_back(all_platform_handles->at(i).fd);
440 serialized_message_fds_length_++; 512 serialized_message_fds_length_++;
441 all_platform_handles->at(i) = PlatformHandle(); 513 all_platform_handles->at(i) = PlatformHandle();
442 } 514 }
443 #endif 515 #endif
516 }
444 517
445 serialized_message_queue_.insert( 518 serialized_message_queue_.insert(
446 serialized_message_queue_.end(), 519 serialized_message_queue_.end(),
447 static_cast<const char*>(message->transport_data()->buffer()), 520 static_cast<const char*>(message->transport_data()->buffer()),
448 static_cast<const char*>(message->transport_data()->buffer()) + 521 static_cast<const char*>(message->transport_data()->buffer()) +
449 transport_data_buffer_size); 522 transport_data_buffer_size);
450 }
451 } 523 }
452 524
453 for (size_t i = 0; i < dispatchers.size(); ++i) 525 for (size_t i = 0; i < dispatchers.size(); ++i)
454 dispatchers[i]->TransportEnded(); 526 dispatchers[i]->TransportEnded();
455 } 527 }
456
457 serialized_ = true;
458 } 528 }
459 529
460 scoped_refptr<Dispatcher> 530 scoped_refptr<Dispatcher>
461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 531 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
462 lock().AssertAcquired(); 532 lock().AssertAcquired();
463 533
464 SerializeInternal(); 534 SerializeInternal();
465 535
466 // TODO(vtl): Currently, there are no options, so we just use 536 scoped_refptr<MessagePipeDispatcher> rv(
467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options 537 new MessagePipeDispatcher(transferable_));
468 // too.
469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
471 serialized_message_queue_.swap(rv->serialized_message_queue_);
472 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
473 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
474 serialized_fds_.swap(rv->serialized_fds_);
475 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
476 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
477 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
478 rv->serialized_ = true; 538 rv->serialized_ = true;
479 rv->write_error_ = write_error_; 539 if (transferable_) {
480 return scoped_refptr<Dispatcher>(rv.get()); 540 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
541 serialized_message_queue_.swap(rv->serialized_message_queue_);
542 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
543 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
544 serialized_fds_.swap(rv->serialized_fds_);
545 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
546 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
547 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
548 rv->write_error_ = write_error_;
549 } else {
550 rv->pipe_id_ = pipe_id_;
551 rv->non_transferable_state_ = non_transferable_state_;
552 }
553 return rv;
481 } 554 }
482 555
483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( 556 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
484 const void* bytes, 557 const void* bytes,
485 uint32_t num_bytes, 558 uint32_t num_bytes,
486 std::vector<DispatcherTransport>* transports, 559 std::vector<DispatcherTransport>* transports,
487 MojoWriteMessageFlags flags) { 560 MojoWriteMessageFlags flags) {
561 lock().AssertAcquired();
488 562
489 DCHECK(!transports || 563 DCHECK(!transports ||
490 (transports->size() > 0 && 564 (transports->size() > 0 &&
491 transports->size() <= GetConfiguration().max_message_num_handles)); 565 transports->size() <= GetConfiguration().max_message_num_handles));
492 566
493 lock().AssertAcquired(); 567 if (write_error_ ||
494 568 (transferable_ && !channel_) ||
495 if (!channel_ || write_error_) 569 (!transferable_ && non_transferable_state_ == CLOSED)) {
496 return MOJO_RESULT_FAILED_PRECONDITION; 570 return MOJO_RESULT_FAILED_PRECONDITION;
571 }
497 572
498 if (num_bytes > GetConfiguration().max_message_num_bytes) 573 if (num_bytes > GetConfiguration().max_message_num_bytes)
499 return MOJO_RESULT_RESOURCE_EXHAUSTED; 574 return MOJO_RESULT_RESOURCE_EXHAUSTED;
500 scoped_ptr<MessageInTransit> message(new MessageInTransit( 575 scoped_ptr<MessageInTransit> message(new MessageInTransit(
501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); 576 MessageInTransit::Type::MESSAGE, num_bytes, bytes));
502 if (transports) { 577 if (transports) {
503 MojoResult result = AttachTransportsNoLock(message.get(), transports); 578 MojoResult result = AttachTransportsNoLock(message.get(), transports);
504 if (result != MOJO_RESULT_OK) 579 if (result != MOJO_RESULT_OK)
505 return result; 580 return result;
506 } 581 }
507 582
508 message->SerializeAndCloseDispatchers(); 583 message->SerializeAndCloseDispatchers();
509 channel_->WriteMessage(message.Pass()); 584 if (!transferable_)
585 message->set_route_id(pipe_id_);
586 if (!transferable_ &&
587 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
588 non_transferable_state_ == CONNECT_CALLED)) {
589 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
590 RequestNontransferableChannel();
591 non_transferable_outgoing_message_queue_.AddMessage(message.Pass());
592 } else {
593 channel_->WriteMessage(message.Pass());
594 }
510 595
511 return MOJO_RESULT_OK; 596 return MOJO_RESULT_OK;
512 } 597 }
513 598
514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( 599 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
515 void* bytes, 600 void* bytes,
516 uint32_t* num_bytes, 601 uint32_t* num_bytes,
517 DispatcherVector* dispatchers, 602 DispatcherVector* dispatchers,
518 uint32_t* num_dispatchers, 603 uint32_t* num_dispatchers,
519 MojoReadMessageFlags flags) { 604 MojoReadMessageFlags flags) {
520 lock().AssertAcquired(); 605 lock().AssertAcquired();
521 if (channel_) 606 if (channel_) {
522 channel_->EnsureLazyInitialized(); 607 channel_->EnsureLazyInitialized();
608 } else if (!transferable_) {
609 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
610 RequestNontransferableChannel();
611 return MOJO_RESULT_SHOULD_WAIT;
612 } else if (non_transferable_state_ == CONNECT_CALLED) {
613 return MOJO_RESULT_SHOULD_WAIT;
614 }
615 }
616
523 DCHECK(!dispatchers || dispatchers->empty()); 617 DCHECK(!dispatchers || dispatchers->empty());
524 618
525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; 619 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; 620 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
527 621
528 if (message_queue_.IsEmpty()) 622 if (message_queue_.IsEmpty())
529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; 623 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION;
530 624
531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop 625 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
532 // and release the lock immediately. 626 // and release the lock immediately.
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 return MOJO_RESULT_OK; 670 return MOJO_RESULT_OK;
577 } 671 }
578 672
579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() 673 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
580 const { 674 const {
581 lock().AssertAcquired(); 675 lock().AssertAcquired();
582 676
583 HandleSignalsState rv; 677 HandleSignalsState rv;
584 if (!message_queue_.IsEmpty()) 678 if (!message_queue_.IsEmpty())
585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 679 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
586 if (channel_ || !message_queue_.IsEmpty()) 680 if (!message_queue_.IsEmpty() ||
681 (transferable_ && channel_) ||
682 (!transferable_ && non_transferable_state_ != CLOSED))
587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 683 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
588 if (channel_ && !write_error_) { 684 if (!write_error_ &&
685 ((transferable_ && channel_) ||
686 (!transferable_ && non_transferable_state_ != CLOSED))) {
589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 687 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 688 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
591 } 689 }
592 if (!channel_ || write_error_) 690 if (write_error_ ||
691 (transferable_ && !channel_) ||
692 (!transferable_ && non_transferable_state_ == CLOSED)) {
593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 693 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
694 }
594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 695 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
595 return rv; 696 return rv;
596 } 697 }
597 698
598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( 699 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
599 Awakable* awakable, 700 Awakable* awakable,
600 MojoHandleSignals signals, 701 MojoHandleSignals signals,
601 uintptr_t context, 702 uintptr_t context,
602 HandleSignalsState* signals_state) { 703 HandleSignalsState* signals_state) {
603 lock().AssertAcquired(); 704 lock().AssertAcquired();
604 if (channel_) 705 if (channel_) {
605 channel_->EnsureLazyInitialized(); 706 channel_->EnsureLazyInitialized();
707 } else if (!transferable_ &&
708 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
709 RequestNontransferableChannel();
710 }
711
606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 712 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
607 if (state.satisfies(signals)) { 713 if (state.satisfies(signals)) {
608 if (signals_state) 714 if (signals_state)
609 *signals_state = state; 715 *signals_state = state;
610 return MOJO_RESULT_ALREADY_EXISTS; 716 return MOJO_RESULT_ALREADY_EXISTS;
611 } 717 }
612 if (!state.can_satisfy(signals)) { 718 if (!state.can_satisfy(signals)) {
613 if (signals_state) 719 if (signals_state)
614 *signals_state = state; 720 *signals_state = state;
615 return MOJO_RESULT_FAILED_PRECONDITION; 721 return MOJO_RESULT_FAILED_PRECONDITION;
(...skipping 30 matching lines...) Expand all
646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); 752 *max_size = sizeof(SerializedMessagePipeHandleDispatcher);
647 } 753 }
648 754
649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( 755 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
650 void* destination, 756 void* destination,
651 size_t* actual_size, 757 size_t* actual_size,
652 PlatformHandleVector* platform_handles) { 758 PlatformHandleVector* platform_handles) {
653 CloseImplNoLock(); 759 CloseImplNoLock();
654 SerializedMessagePipeHandleDispatcher* serialization = 760 SerializedMessagePipeHandleDispatcher* serialization =
655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); 761 static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
762 serialization->transferable = transferable_;
763 serialization->pipe_id = pipe_id_;
656 if (serialized_platform_handle_.is_valid()) { 764 if (serialized_platform_handle_.is_valid()) {
657 serialization->platform_handle_index = platform_handles->size(); 765 serialization->platform_handle_index = platform_handles->size();
658 platform_handles->push_back(serialized_platform_handle_.release()); 766 platform_handles->push_back(serialized_platform_handle_.release());
659 } else { 767 } else {
660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; 768 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
661 } 769 }
662 770
663 serialization->write_error = write_error_; 771 serialization->write_error = write_error_;
664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); 772 serialization->serialized_read_buffer_size = serialized_read_buffer_.size();
665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); 773 serialization->serialized_write_buffer_size = serialized_write_buffer_.size();
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
789 break; 897 break;
790 } 898 }
791 899
792 if (started_transport_.Try()) { 900 if (started_transport_.Try()) {
793 base::AutoLock locker(lock()); 901 base::AutoLock locker(lock());
794 // We can get two OnError callbacks before the post task below completes. 902 // We can get two OnError callbacks before the post task below completes.
795 // Although RawChannel still has a pointer to this object until Shutdown is 903 // Although RawChannel still has a pointer to this object until Shutdown is
796 // called, that is safe since this class always does a PostTask to the IO 904 // called, that is safe since this class always does a PostTask to the IO
797 // thread to self destruct. 905 // thread to self destruct.
798 if (channel_ && error != ERROR_WRITE) { 906 if (channel_ && error != ERROR_WRITE) {
799 channel_->Shutdown(); 907 if (transferable_) {
908 channel_->Shutdown();
909 } else {
910 CHECK_NE(non_transferable_state_, CLOSED);
911 // Since we're in a callback from the Broker, call it asynchronously.
912 internal::g_io_thread_task_runner->PostTask(
913 FROM_HERE,
914 base::Bind(&Broker::CloseMessagePipe,
915 base::Unretained(internal::g_broker), pipe_id_,
916 base::Unretained(this)));
917 non_transferable_state_ = CLOSED;
918 }
800 channel_ = nullptr; 919 channel_ = nullptr;
801 } 920 }
802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 921 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
803 started_transport_.Release(); 922 started_transport_.Release();
804 } else { 923 } else {
805 // We must be waiting to call ReleaseHandle. It will call Shutdown. 924 // We must be waiting to call ReleaseHandle. It will call Shutdown.
806 } 925 }
807 } 926 }
808 927
809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( 928 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
810 MessageInTransit* message, 929 MessageInTransit* message,
811 std::vector<DispatcherTransport>* transports) { 930 std::vector<DispatcherTransport>* transports) {
812 DCHECK(!message->has_dispatchers()); 931 DCHECK(!message->has_dispatchers());
813 932
814 // You're not allowed to send either handle to a message pipe over the message 933 // You're not allowed to send either handle to a message pipe over the message
815 // pipe, so check for this. (The case of trying to write a handle to itself is 934 // pipe, so check for this. (The case of trying to write a handle to itself is
816 // taken care of by |Core|. That case kind of makes sense, but leads to 935 // taken care of by |Core|. That case kind of makes sense, but leads to
817 // complications if, e.g., both sides try to do the same thing with their 936 // complications if, e.g., both sides try to do the same thing with their
818 // respective handles simultaneously. The other case, of trying to write the 937 // respective handles simultaneously. The other case, of trying to write the
819 // peer handle to a handle, doesn't make sense -- since no handle will be 938 // peer handle to a handle, doesn't make sense -- since no handle will be
820 // available to read the message from.) 939 // available to read the message from.)
821 for (size_t i = 0; i < transports->size(); i++) { 940 for (size_t i = 0; i < transports->size(); i++) {
822 if (!(*transports)[i].is_valid()) 941 if (!(*transports)[i].is_valid())
823 continue; 942 continue;
824 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { 943 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
825 MessagePipeDispatcher* mp = 944 MessagePipeDispatcher* mp =
826 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); 945 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
827 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { 946 if (transferable_ && mp->transferable_ &&
947 channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
828 // The other case should have been disallowed by |Core|. (Note: |port| 948 // The other case should have been disallowed by |Core|. (Note: |port|
829 // is the peer port of the handle given to |WriteMessage()|.) 949 // is the peer port of the handle given to |WriteMessage()|.)
830 return MOJO_RESULT_INVALID_ARGUMENT; 950 return MOJO_RESULT_INVALID_ARGUMENT;
951 } else if (!transferable_ && !mp->transferable_ &&
952 pipe_id_ == mp->pipe_id_) {
953 return MOJO_RESULT_INVALID_ARGUMENT;
831 } 954 }
832 } 955 }
833 } 956 }
834 957
835 // Clone the dispatchers and attach them to the message. (This must be done as 958 // Clone the dispatchers and attach them to the message. (This must be done as
836 // a separate loop, since we want to leave the dispatchers alone on failure.) 959 // a separate loop, since we want to leave the dispatchers alone on failure.)
837 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); 960 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
838 dispatchers->reserve(transports->size()); 961 dispatchers->reserve(transports->size());
839 for (size_t i = 0; i < transports->size(); i++) { 962 for (size_t i = 0; i < transports->size(); i++) {
840 if ((*transports)[i].is_valid()) { 963 if ((*transports)[i].is_valid()) {
841 dispatchers->push_back( 964 dispatchers->push_back(
842 (*transports)[i].CreateEquivalentDispatcherAndClose()); 965 (*transports)[i].CreateEquivalentDispatcherAndClose());
843 } else { 966 } else {
844 LOG(WARNING) << "Enqueueing null dispatcher"; 967 LOG(WARNING) << "Enqueueing null dispatcher";
845 dispatchers->push_back(nullptr); 968 dispatchers->push_back(nullptr);
846 } 969 }
847 } 970 }
848 message->SetDispatchers(dispatchers.Pass()); 971 message->SetDispatchers(dispatchers.Pass());
849 return MOJO_RESULT_OK; 972 return MOJO_RESULT_OK;
850 } 973 }
851 974
975 void MessagePipeDispatcher::RequestNontransferableChannel() {
976 lock().AssertAcquired();
977 CHECK(!transferable_);
978 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
979 non_transferable_state_ = CONNECT_CALLED;
980
981 // PostTask since the broker can call us back synchronously.
982 internal::g_io_thread_task_runner->PostTask(
983 FROM_HERE,
984 base::Bind(&Broker::ConnectMessagePipe,
985 base::Unretained(internal::g_broker), pipe_id_,
986 base::Unretained(this)));
987 }
988
852 } // namespace edk 989 } // namespace edk
853 } // namespace mojo 990 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698