Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(632)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix chrome and POSIX Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h" 9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h" 10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/embedder/platform_handle_utils.h" 11 #include "mojo/edk/embedder/platform_handle_utils.h"
12 #include "mojo/edk/embedder/platform_shared_buffer.h" 12 #include "mojo/edk/embedder/platform_shared_buffer.h"
13 #include "mojo/edk/embedder/platform_support.h" 13 #include "mojo/edk/embedder/platform_support.h"
14 #include "mojo/edk/system/broker.h" 14 #include "mojo/edk/system/broker.h"
15 #include "mojo/edk/system/configuration.h" 15 #include "mojo/edk/system/configuration.h"
16 #include "mojo/edk/system/message_in_transit.h" 16 #include "mojo/edk/system/message_in_transit.h"
17 #include "mojo/edk/system/options_validation.h" 17 #include "mojo/edk/system/options_validation.h"
18 #include "mojo/edk/system/transport_data.h" 18 #include "mojo/edk/system/transport_data.h"
19 19
20 namespace mojo { 20 namespace mojo {
21 namespace edk { 21 namespace edk {
22 22
23 namespace { 23 namespace {
24 24
25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
26 26
27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
28 bool transferable;
29 bool write_error;
30 uint64_t pipe_id; // If transferable is false.
31 // The following members are only set if transferable is true.
28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
29 // was closed. 33 // was closed.
30 size_t platform_handle_index; 34 size_t platform_handle_index;
31 bool write_error;
32 35
33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
34 uint32_t shared_memory_size; 37 uint32_t shared_memory_size;
35 38
36 size_t serialized_read_buffer_size; 39 size_t serialized_read_buffer_size;
37 size_t serialized_write_buffer_size; 40 size_t serialized_write_buffer_size;
38 size_t serialized_message_queue_size; 41 size_t serialized_message_queue_size;
39 42
40 // These are the FDs required as part of serializing channel_ and 43 // These are the FDs required as part of serializing channel_ and
41 // message_queue_. This is only used on POSIX. 44 // message_queue_. This is only used on POSIX.
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 86
84 const MojoCreateMessagePipeOptions 87 const MojoCreateMessagePipeOptions
85 MessagePipeDispatcher::kDefaultCreateOptions = { 88 MessagePipeDispatcher::kDefaultCreateOptions = {
86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
88 91
89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
90 const MojoCreateMessagePipeOptions* in_options, 93 const MojoCreateMessagePipeOptions* in_options,
91 MojoCreateMessagePipeOptions* out_options) { 94 MojoCreateMessagePipeOptions* out_options) {
92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE |
97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
94 98
95 *out_options = kDefaultCreateOptions; 99 *out_options = kDefaultCreateOptions;
96 if (!in_options) 100 if (!in_options)
97 return MOJO_RESULT_OK; 101 return MOJO_RESULT_OK;
98 102
99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
100 if (!reader.is_valid()) 104 if (!reader.is_valid())
101 return MOJO_RESULT_INVALID_ARGUMENT; 105 return MOJO_RESULT_INVALID_ARGUMENT;
102 106
103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
104 return MOJO_RESULT_OK; 108 return MOJO_RESULT_OK;
105 if ((reader.options().flags & ~kKnownFlags)) 109 if ((reader.options().flags & ~kKnownFlags))
106 return MOJO_RESULT_UNIMPLEMENTED; 110 return MOJO_RESULT_UNIMPLEMENTED;
107 out_options->flags = reader.options().flags; 111 out_options->flags = reader.options().flags;
108 112
109 // Checks for fields beyond |flags|: 113 // Checks for fields beyond |flags|:
110 114
111 // (Nothing here yet.) 115 // (Nothing here yet.)
112 116
113 return MOJO_RESULT_OK; 117 return MOJO_RESULT_OK;
114 } 118 }
115 119
116 void MessagePipeDispatcher::Init( 120 void MessagePipeDispatcher::Init(
117 ScopedPlatformHandle message_pipe, 121 ScopedPlatformHandle message_pipe,
118 char* serialized_read_buffer, size_t serialized_read_buffer_size, 122 char* serialized_read_buffer, size_t serialized_read_buffer_size,
119 char* serialized_write_buffer, size_t serialized_write_buffer_size, 123 char* serialized_write_buffer, size_t serialized_write_buffer_size,
120 std::vector<int>* serialized_read_fds, 124 std::vector<int>* serialized_read_fds,
121 std::vector<int>* serialized_write_fds) { 125 std::vector<int>* serialized_write_fds) {
126 CHECK(transferable_);
122 if (message_pipe.get().is_valid()) { 127 if (message_pipe.get().is_valid()) {
123 channel_ = RawChannel::Create(message_pipe.Pass()); 128 channel_ = RawChannel::Create(message_pipe.Pass());
124 129
125 // TODO(jam): It's probably cleaner to pass this in Init call. 130 // TODO(jam): It's probably cleaner to pass this in Init call.
126 channel_->SetSerializedData( 131 channel_->SetSerializedData(
127 serialized_read_buffer, serialized_read_buffer_size, 132 serialized_read_buffer, serialized_read_buffer_size,
128 serialized_write_buffer, serialized_write_buffer_size, 133 serialized_write_buffer, serialized_write_buffer_size,
129 serialized_read_fds, serialized_write_fds); 134 serialized_read_fds, serialized_write_fds);
130 internal::g_io_thread_task_runner->PostTask( 135 internal::g_io_thread_task_runner->PostTask(
131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
132 } 137 }
133 } 138 }
134 139
140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
141 CHECK(!transferable_);
142 pipe_id_ = pipe_id;
143 }
144
135 void MessagePipeDispatcher::InitOnIO() { 145 void MessagePipeDispatcher::InitOnIO() {
136 base::AutoLock locker(lock()); 146 base::AutoLock locker(lock());
147 CHECK(transferable_);
137 calling_init_ = true; 148 calling_init_ = true;
138 if (channel_) 149 if (channel_)
139 channel_->Init(this); 150 channel_->Init(this);
140 calling_init_ = false; 151 calling_init_ = false;
141 } 152 }
142 153
143 void MessagePipeDispatcher::CloseOnIO() { 154 void MessagePipeDispatcher::CloseOnIO() {
144 base::AutoLock locker(lock()); 155 base::AutoLock locker(lock());
145 156 if (transferable_) {
146 if (channel_) { 157 if (channel_) {
147 channel_->Shutdown(); 158 channel_->Shutdown();
148 channel_ = nullptr; 159 channel_ = nullptr;
160 }
161 } else {
162 if (non_transferable_state_ == CONNECT_CALLED) {
163 // We can't cancel the pending request yet, since the other side of the
164 // message pipe would want to get pending outgoing messages (if any) or
165 // at least know that this end was closed. So keep this object alive until
166 // then.
167 AddRef();
168 } else if (non_transferable_state_ == CONNECTED) {
169 internal::g_broker->CloseMessagePipe(pipe_id_, this);
170 non_transferable_state_ = CLOSED;
171 channel_ = nullptr;
172 }
149 } 173 }
150 } 174 }
151 175
152 Dispatcher::Type MessagePipeDispatcher::GetType() const { 176 Dispatcher::Type MessagePipeDispatcher::GetType() const {
153 return Type::MESSAGE_PIPE; 177 return Type::MESSAGE_PIPE;
154 } 178 }
155 179
180 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
181 base::AutoLock locker(lock());
182 channel_ = channel;
183 while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
184 channel_->WriteMessage(
185 non_transferable_outgoing_message_queue_.GetMessage());
186 }
187
188 if (is_closed()) {
189 CHECK_EQ(non_transferable_state_, CONNECT_CALLED);
190 // We kept this object alive until it's connected, we can release it now.
191 // Since we're in a callback from the Broker, call it asynchronously.
192 internal::g_io_thread_task_runner->PostTask(
193 FROM_HERE,
194 base::Bind(&Broker::CloseMessagePipe,
195 base::Unretained(internal::g_broker), pipe_id_,
196 base::Unretained(this)));
197 non_transferable_state_ = CLOSED;
198 channel_ = nullptr;
199 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
200 } else {
201 non_transferable_state_ = CONNECTED;
202 }
203 }
204
156 #if defined(OS_WIN) 205 #if defined(OS_WIN)
157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the 206 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
158 // best way we want to share this. 207 // best way we want to share this.
159 // Since this is used for serialization of messages read/written to a MP that 208 // Since this is used for serialization of messages read/written to a MP that
160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of 209 // aren't consumed by Mojo primitives yet, there could be an unbounded number of
161 // them when a MP is being sent. As a result, even for POSIX we will probably 210 // them when a MP is being sent. As a result, even for POSIX we will probably
162 // want to send the handles to the shell process and exchange them for tokens 211 // want to send the handles to the shell process and exchange them for tokens
163 // (since we can be sure that the shell will respond to our IPCs, compared to 212 // (since we can be sure that the shell will respond to our IPCs, compared to
164 // the other end where we're sending the MP to, which may not be reading...). 213 // the other end where we're sending the MP to, which may not be reading...).
165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( 214 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
(...skipping 13 matching lines...) Expand all
179 const void* source, 228 const void* source,
180 size_t size, 229 size_t size,
181 PlatformHandleVector* platform_handles) { 230 PlatformHandleVector* platform_handles) {
182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { 231 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) {
183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; 232 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)";
184 return nullptr; 233 return nullptr;
185 } 234 }
186 235
187 const SerializedMessagePipeHandleDispatcher* serialization = 236 const SerializedMessagePipeHandleDispatcher* serialization =
188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); 237 static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
238
239 scoped_refptr<MessagePipeDispatcher> rv(
240 new MessagePipeDispatcher(serialization->transferable));
241 if (!rv->transferable_) {
242 rv->InitNonTransferable(serialization->pipe_id);
243 return rv;
244 }
245
189 if (serialization->shared_memory_size != 246 if (serialization->shared_memory_size !=
190 (serialization->serialized_read_buffer_size + 247 (serialization->serialized_read_buffer_size +
191 serialization->serialized_write_buffer_size + 248 serialization->serialized_write_buffer_size +
192 serialization->serialized_message_queue_size)) { 249 serialization->serialized_message_queue_size)) {
193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; 250 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)";
194 return nullptr; 251 return nullptr;
195 } 252 }
196 253
197 ScopedPlatformHandle platform_handle, shared_memory_handle; 254 ScopedPlatformHandle platform_handle, shared_memory_handle;
198 if (!GetHandle(serialization->platform_handle_index, 255 if (!GetHandle(serialization->platform_handle_index,
(...skipping 27 matching lines...) Expand all
226 serialization->serialized_write_buffer_size; 283 serialization->serialized_write_buffer_size;
227 buffer += serialized_write_buffer_size; 284 buffer += serialized_write_buffer_size;
228 } 285 }
229 if (serialization->serialized_message_queue_size) { 286 if (serialization->serialized_message_queue_size) {
230 message_queue_data = buffer; 287 message_queue_data = buffer;
231 message_queue_size = serialization->serialized_message_queue_size; 288 message_queue_size = serialization->serialized_message_queue_size;
232 buffer += message_queue_size; 289 buffer += message_queue_size;
233 } 290 }
234 } 291 }
235 292
236 scoped_refptr<MessagePipeDispatcher> rv(
237 Create(MessagePipeDispatcher::kDefaultCreateOptions));
238 rv->write_error_ = serialization->write_error; 293 rv->write_error_ = serialization->write_error;
239 294
240 std::vector<int> serialized_read_fds; 295 std::vector<int> serialized_read_fds;
241 std::vector<int> serialized_write_fds; 296 std::vector<int> serialized_write_fds;
242 #if defined(OS_POSIX) 297 #if defined(OS_POSIX)
243 std::vector<int> serialized_fds; 298 std::vector<int> serialized_fds;
244 size_t serialized_fds_index = 0; 299 size_t serialized_fds_index = 0;
245 300
246 size_t total_fd_count = serialization->serialized_read_fds_length + 301 size_t total_fd_count = serialization->serialized_read_fds_length +
247 serialization->serialized_write_fds_length + 302 serialization->serialized_write_fds_length +
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 &serialized_write_fds); 381 &serialized_write_fds);
327 382
328 if (message_queue_size) { // Should be empty by now. 383 if (message_queue_size) { // Should be empty by now.
329 LOG(ERROR) << "Invalid queued messages"; 384 LOG(ERROR) << "Invalid queued messages";
330 return nullptr; 385 return nullptr;
331 } 386 }
332 387
333 return rv; 388 return rv;
334 } 389 }
335 390
336 MessagePipeDispatcher::MessagePipeDispatcher() 391 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
337 : channel_(nullptr), 392 : channel_(nullptr),
338 serialized_(false),
339 serialized_read_fds_length_(0u), 393 serialized_read_fds_length_(0u),
340 serialized_write_fds_length_(0u), 394 serialized_write_fds_length_(0u),
341 serialized_message_fds_length_(0u), 395 serialized_message_fds_length_(0u),
396 pipe_id_(0),
397 non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
398 serialized_(false),
342 calling_init_(false), 399 calling_init_(false),
343 write_error_(false) { 400 write_error_(false),
401 transferable_(transferable) {
344 } 402 }
345 403
346 MessagePipeDispatcher::~MessagePipeDispatcher() { 404 MessagePipeDispatcher::~MessagePipeDispatcher() {
347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The 405 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
348 // exception is if they posted a task to run CloseOnIO but the IO thread shut 406 // exception is if they posted a task to run CloseOnIO but the IO thread shut
349 // down and so when it was deleting pending tasks it caused the last reference 407 // down and so when it was deleting pending tasks it caused the last reference
350 // to destruct this object. In that case, safe to destroy the channel. 408 // to destruct this object. In that case, safe to destroy the channel.
351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) 409 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
352 channel_->Shutdown(); 410 channel_->Shutdown();
353 else 411 else
354 DCHECK(!channel_); 412 DCHECK(!channel_);
355 #if defined(OS_POSIX) 413 #if defined(OS_POSIX)
356 ClosePlatformHandles(&serialized_fds_); 414 ClosePlatformHandles(&serialized_fds_);
357 #endif 415 #endif
358 } 416 }
359 417
360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { 418 void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
361 lock().AssertAcquired(); 419 lock().AssertAcquired();
362 awakable_list_.CancelAll(); 420 awakable_list_.CancelAll();
363 } 421 }
364 422
365 void MessagePipeDispatcher::CloseImplNoLock() { 423 void MessagePipeDispatcher::CloseImplNoLock() {
366 lock().AssertAcquired(); 424 lock().AssertAcquired();
367 internal::g_io_thread_task_runner->PostTask( 425 internal::g_io_thread_task_runner->PostTask(
368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); 426 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
369 } 427 }
370 428
371 void MessagePipeDispatcher::SerializeInternal() { 429 void MessagePipeDispatcher::SerializeInternal() {
430 serialized_ = true;
431 if (!transferable_) {
432 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
433 << "Non transferable message pipe being sent after read/write. "
434 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
435 << "the pipe can be sent after it's read or written.";
436 return;
437 }
438
372 // We need to stop watching handle immediately, even though not on IO thread, 439 // We need to stop watching handle immediately, even though not on IO thread,
373 // so that other messages aren't read after this. 440 // so that other messages aren't read after this.
374 std::vector<int> serialized_read_fds, serialized_write_fds; 441 std::vector<int> serialized_read_fds, serialized_write_fds;
375 if (channel_) { 442 if (channel_) {
376 bool write_error = false; 443 bool write_error = false;
377 444
378 serialized_platform_handle_ = channel_->ReleaseHandle( 445 serialized_platform_handle_ = channel_->ReleaseHandle(
379 &serialized_read_buffer_, &serialized_write_buffer_, 446 &serialized_read_buffer_, &serialized_write_buffer_,
380 &serialized_read_fds, &serialized_write_fds, &write_error); 447 &serialized_read_fds, &serialized_write_fds, &write_error);
381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), 448 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(),
382 serialized_read_fds.end()); 449 serialized_read_fds.end());
383 serialized_read_fds_length_ = serialized_read_fds.size(); 450 serialized_read_fds_length_ = serialized_read_fds.size();
384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), 451 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(),
385 serialized_write_fds.end()); 452 serialized_write_fds.end());
386 serialized_write_fds_length_ = serialized_write_fds.size(); 453 serialized_write_fds_length_ = serialized_write_fds.size();
387 channel_ = nullptr; 454 channel_ = nullptr;
388 if (write_error)
389 write_error = true;
390 } else { 455 } else {
391 // It's valid that the other side wrote some data and closed its end. 456 // It's valid that the other side wrote some data and closed its end.
392 } 457 }
393 458
394 DCHECK(serialized_message_queue_.empty()); 459 DCHECK(serialized_message_queue_.empty());
395 while (!message_queue_.IsEmpty()) { 460 while (!message_queue_.IsEmpty()) {
396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); 461 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
397 462
398 // When MojoWriteMessage is called, the MessageInTransit doesn't have 463 // When MojoWriteMessage is called, the MessageInTransit doesn't have
399 // dispatchers set and CreateEquivaent... is called since the dispatchers 464 // dispatchers set and CreateEquivaent... is called since the dispatchers
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
446 serialized_message_queue_.end(), 511 serialized_message_queue_.end(),
447 static_cast<const char*>(message->transport_data()->buffer()), 512 static_cast<const char*>(message->transport_data()->buffer()),
448 static_cast<const char*>(message->transport_data()->buffer()) + 513 static_cast<const char*>(message->transport_data()->buffer()) +
449 transport_data_buffer_size); 514 transport_data_buffer_size);
450 } 515 }
451 } 516 }
452 517
453 for (size_t i = 0; i < dispatchers.size(); ++i) 518 for (size_t i = 0; i < dispatchers.size(); ++i)
454 dispatchers[i]->TransportEnded(); 519 dispatchers[i]->TransportEnded();
455 } 520 }
456
457 serialized_ = true;
458 } 521 }
459 522
460 scoped_refptr<Dispatcher> 523 scoped_refptr<Dispatcher>
461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 524 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
462 lock().AssertAcquired(); 525 lock().AssertAcquired();
463 526
464 SerializeInternal(); 527 SerializeInternal();
465 528
466 // TODO(vtl): Currently, there are no options, so we just use 529 // TODO(vtl): Currently, there are no options, so we just use
yzshen1 2015/12/03 23:37:50 I think this comment is not useful anymore.
jam 2015/12/04 05:06:47 Done.
467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options 530 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
468 // too. 531 // too.
469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); 532 scoped_refptr<MessagePipeDispatcher> rv(
470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); 533 new MessagePipeDispatcher(transferable_));
471 serialized_message_queue_.swap(rv->serialized_message_queue_);
472 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
473 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
474 serialized_fds_.swap(rv->serialized_fds_);
475 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
476 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
477 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
478 rv->serialized_ = true; 534 rv->serialized_ = true;
479 rv->write_error_ = write_error_; 535 if (transferable_) {
480 return scoped_refptr<Dispatcher>(rv.get()); 536 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
537 serialized_message_queue_.swap(rv->serialized_message_queue_);
538 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
539 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
540 serialized_fds_.swap(rv->serialized_fds_);
541 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
542 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
543 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
544 rv->write_error_ = write_error_;
545 } else {
546 rv->pipe_id_ = pipe_id_;
547 rv->non_transferable_state_ = non_transferable_state_;
548 }
549 return rv;
481 } 550 }
482 551
483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( 552 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
484 const void* bytes, 553 const void* bytes,
485 uint32_t num_bytes, 554 uint32_t num_bytes,
486 std::vector<DispatcherTransport>* transports, 555 std::vector<DispatcherTransport>* transports,
487 MojoWriteMessageFlags flags) { 556 MojoWriteMessageFlags flags) {
557 lock().AssertAcquired();
488 558
489 DCHECK(!transports || 559 DCHECK(!transports ||
490 (transports->size() > 0 && 560 (transports->size() > 0 &&
491 transports->size() <= GetConfiguration().max_message_num_handles)); 561 transports->size() <= GetConfiguration().max_message_num_handles));
492 562
493 lock().AssertAcquired(); 563 if (write_error_ ||
494 564 (transferable_ && !channel_) ||
495 if (!channel_ || write_error_) 565 (!transferable_ && non_transferable_state_ == CLOSED)) {
496 return MOJO_RESULT_FAILED_PRECONDITION; 566 return MOJO_RESULT_FAILED_PRECONDITION;
567 }
497 568
498 if (num_bytes > GetConfiguration().max_message_num_bytes) 569 if (num_bytes > GetConfiguration().max_message_num_bytes)
499 return MOJO_RESULT_RESOURCE_EXHAUSTED; 570 return MOJO_RESULT_RESOURCE_EXHAUSTED;
500 scoped_ptr<MessageInTransit> message(new MessageInTransit( 571 scoped_ptr<MessageInTransit> message(new MessageInTransit(
501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); 572 MessageInTransit::Type::MESSAGE, num_bytes, bytes));
502 if (transports) { 573 if (transports) {
503 MojoResult result = AttachTransportsNoLock(message.get(), transports); 574 MojoResult result = AttachTransportsNoLock(message.get(), transports);
504 if (result != MOJO_RESULT_OK) 575 if (result != MOJO_RESULT_OK)
505 return result; 576 return result;
506 } 577 }
507 578
508 message->SerializeAndCloseDispatchers(); 579 message->SerializeAndCloseDispatchers();
509 channel_->WriteMessage(message.Pass()); 580 if (!transferable_)
581 message->set_route_id(pipe_id_);
582 if (!transferable_ &&
583 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
584 non_transferable_state_ == CONNECT_CALLED)) {
585 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
586 RequestNontransferableChannel();
587 non_transferable_outgoing_message_queue_.AddMessage(message.Pass());
588 } else {
589 channel_->WriteMessage(message.Pass());
590 }
510 591
511 return MOJO_RESULT_OK; 592 return MOJO_RESULT_OK;
512 } 593 }
513 594
514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( 595 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
515 void* bytes, 596 void* bytes,
516 uint32_t* num_bytes, 597 uint32_t* num_bytes,
517 DispatcherVector* dispatchers, 598 DispatcherVector* dispatchers,
518 uint32_t* num_dispatchers, 599 uint32_t* num_dispatchers,
519 MojoReadMessageFlags flags) { 600 MojoReadMessageFlags flags) {
520 lock().AssertAcquired(); 601 lock().AssertAcquired();
521 if (channel_) 602 if (channel_) {
522 channel_->EnsureLazyInitialized(); 603 channel_->EnsureLazyInitialized();
604 } else if (!transferable_ &&
605 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
606 RequestNontransferableChannel();
607 return MOJO_RESULT_SHOULD_WAIT;
608 }
609
523 DCHECK(!dispatchers || dispatchers->empty()); 610 DCHECK(!dispatchers || dispatchers->empty());
524 611
525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; 612 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; 613 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
527 614
528 if (message_queue_.IsEmpty()) 615 if (message_queue_.IsEmpty())
529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; 616 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION;
530 617
531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop 618 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
532 // and release the lock immediately. 619 // and release the lock immediately.
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 return MOJO_RESULT_OK; 663 return MOJO_RESULT_OK;
577 } 664 }
578 665
579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() 666 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
580 const { 667 const {
581 lock().AssertAcquired(); 668 lock().AssertAcquired();
582 669
583 HandleSignalsState rv; 670 HandleSignalsState rv;
584 if (!message_queue_.IsEmpty()) 671 if (!message_queue_.IsEmpty())
585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 672 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
586 if (channel_ || !message_queue_.IsEmpty()) 673 if (!message_queue_.IsEmpty() ||
674 (transferable_ && channel_) ||
675 (!transferable_ && non_transferable_state_ != CLOSED))
587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 676 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
588 if (channel_ && !write_error_) { 677 if (!write_error_ &&
678 ((transferable_ && channel_) ||
679 (!transferable_ && non_transferable_state_ != CLOSED))) {
589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 680 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 681 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
591 } 682 }
592 if (!channel_ || write_error_) 683 if (write_error_ ||
684 (transferable_ && !channel_) ||
685 (!transferable_ && non_transferable_state_ == CLOSED)) {
593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 686 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
687 }
594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 688 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
595 return rv; 689 return rv;
596 } 690 }
597 691
598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( 692 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
599 Awakable* awakable, 693 Awakable* awakable,
600 MojoHandleSignals signals, 694 MojoHandleSignals signals,
601 uintptr_t context, 695 uintptr_t context,
602 HandleSignalsState* signals_state) { 696 HandleSignalsState* signals_state) {
603 lock().AssertAcquired(); 697 lock().AssertAcquired();
604 if (channel_) 698 if (channel_) {
605 channel_->EnsureLazyInitialized(); 699 channel_->EnsureLazyInitialized();
700 } else if (!transferable_ &&
701 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
702 RequestNontransferableChannel();
703 }
704
606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 705 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
607 if (state.satisfies(signals)) { 706 if (state.satisfies(signals)) {
608 if (signals_state) 707 if (signals_state)
609 *signals_state = state; 708 *signals_state = state;
610 return MOJO_RESULT_ALREADY_EXISTS; 709 return MOJO_RESULT_ALREADY_EXISTS;
611 } 710 }
612 if (!state.can_satisfy(signals)) { 711 if (!state.can_satisfy(signals)) {
613 if (signals_state) 712 if (signals_state)
614 *signals_state = state; 713 *signals_state = state;
615 return MOJO_RESULT_FAILED_PRECONDITION; 714 return MOJO_RESULT_FAILED_PRECONDITION;
(...skipping 30 matching lines...) Expand all
646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); 745 *max_size = sizeof(SerializedMessagePipeHandleDispatcher);
647 } 746 }
648 747
649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( 748 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
650 void* destination, 749 void* destination,
651 size_t* actual_size, 750 size_t* actual_size,
652 PlatformHandleVector* platform_handles) { 751 PlatformHandleVector* platform_handles) {
653 CloseImplNoLock(); 752 CloseImplNoLock();
654 SerializedMessagePipeHandleDispatcher* serialization = 753 SerializedMessagePipeHandleDispatcher* serialization =
655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); 754 static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
755 serialization->transferable = transferable_;
756 serialization->pipe_id = pipe_id_;
656 if (serialized_platform_handle_.is_valid()) { 757 if (serialized_platform_handle_.is_valid()) {
657 serialization->platform_handle_index = platform_handles->size(); 758 serialization->platform_handle_index = platform_handles->size();
658 platform_handles->push_back(serialized_platform_handle_.release()); 759 platform_handles->push_back(serialized_platform_handle_.release());
659 } else { 760 } else {
660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; 761 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
661 } 762 }
662 763
663 serialization->write_error = write_error_; 764 serialization->write_error = write_error_;
664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); 765 serialization->serialized_read_buffer_size = serialized_read_buffer_.size();
665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); 766 serialization->serialized_write_buffer_size = serialized_write_buffer_.size();
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
789 break; 890 break;
790 } 891 }
791 892
792 if (started_transport_.Try()) { 893 if (started_transport_.Try()) {
793 base::AutoLock locker(lock()); 894 base::AutoLock locker(lock());
794 // We can get two OnError callbacks before the post task below completes. 895 // We can get two OnError callbacks before the post task below completes.
795 // Although RawChannel still has a pointer to this object until Shutdown is 896 // Although RawChannel still has a pointer to this object until Shutdown is
796 // called, that is safe since this class always does a PostTask to the IO 897 // called, that is safe since this class always does a PostTask to the IO
797 // thread to self destruct. 898 // thread to self destruct.
798 if (channel_ && error != ERROR_WRITE) { 899 if (channel_ && error != ERROR_WRITE) {
799 channel_->Shutdown(); 900 if (transferable_) {
901 channel_->Shutdown();
902 } else {
903 CHECK_NE(non_transferable_state_, CLOSED);
904 // Since we're in a callback from the Broker, call it asynchronously.
905 internal::g_io_thread_task_runner->PostTask(
906 FROM_HERE,
907 base::Bind(&Broker::CloseMessagePipe,
908 base::Unretained(internal::g_broker), pipe_id_,
909 base::Unretained(this)));
910 non_transferable_state_ = CLOSED;
911 }
800 channel_ = nullptr; 912 channel_ = nullptr;
801 } 913 }
802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 914 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
803 started_transport_.Release(); 915 started_transport_.Release();
804 } else { 916 } else {
805 // We must be waiting to call ReleaseHandle. It will call Shutdown. 917 // We must be waiting to call ReleaseHandle. It will call Shutdown.
806 } 918 }
807 } 919 }
808 920
809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( 921 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
842 (*transports)[i].CreateEquivalentDispatcherAndClose()); 954 (*transports)[i].CreateEquivalentDispatcherAndClose());
843 } else { 955 } else {
844 LOG(WARNING) << "Enqueueing null dispatcher"; 956 LOG(WARNING) << "Enqueueing null dispatcher";
845 dispatchers->push_back(nullptr); 957 dispatchers->push_back(nullptr);
846 } 958 }
847 } 959 }
848 message->SetDispatchers(dispatchers.Pass()); 960 message->SetDispatchers(dispatchers.Pass());
849 return MOJO_RESULT_OK; 961 return MOJO_RESULT_OK;
850 } 962 }
851 963
964 void MessagePipeDispatcher::RequestNontransferableChannel() {
965 lock().AssertAcquired();
966 CHECK(!transferable_);
967 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
968 non_transferable_state_ = CONNECT_CALLED;
969
970 // PostTask since the broker can call us back synchronously.
971 internal::g_io_thread_task_runner->PostTask(
972 FROM_HERE,
973 base::Bind(&Broker::ConnectMessagePipe,
974 base::Unretained(internal::g_broker), pipe_id_,
975 base::Unretained(this)));
976 }
977
852 } // namespace edk 978 } // namespace edk
853 } // namespace mojo 979 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698