Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(349)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: merge Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h" 9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h" 10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/embedder/platform_handle_utils.h" 11 #include "mojo/edk/embedder/platform_handle_utils.h"
12 #include "mojo/edk/embedder/platform_shared_buffer.h" 12 #include "mojo/edk/embedder/platform_shared_buffer.h"
13 #include "mojo/edk/embedder/platform_support.h" 13 #include "mojo/edk/embedder/platform_support.h"
14 #include "mojo/edk/system/broker.h" 14 #include "mojo/edk/system/broker.h"
15 #include "mojo/edk/system/configuration.h" 15 #include "mojo/edk/system/configuration.h"
16 #include "mojo/edk/system/message_in_transit.h" 16 #include "mojo/edk/system/message_in_transit.h"
17 #include "mojo/edk/system/options_validation.h" 17 #include "mojo/edk/system/options_validation.h"
18 #include "mojo/edk/system/transport_data.h" 18 #include "mojo/edk/system/transport_data.h"
19 19
20 namespace mojo { 20 namespace mojo {
21 namespace edk { 21 namespace edk {
22 22
23 namespace { 23 namespace {
24 24
25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
26 26
27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
28 bool transferable;
29 bool write_error;
30 uint64_t pipe_id; // If transferable is false.
31 // The following members are only set if transferable is true.
28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
29 // was closed. 33 // was closed.
30 size_t platform_handle_index; 34 size_t platform_handle_index;
31 bool write_error;
32 35
33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
34 uint32_t shared_memory_size; 37 uint32_t shared_memory_size;
35 38
36 size_t serialized_read_buffer_size; 39 size_t serialized_read_buffer_size;
37 size_t serialized_write_buffer_size; 40 size_t serialized_write_buffer_size;
38 size_t serialized_message_queue_size; 41 size_t serialized_message_queue_size;
39 42
40 // These are the FDs required as part of serializing channel_ and 43 // These are the FDs required as part of serializing channel_ and
41 // message_queue_. This is only used on POSIX. 44 // message_queue_. This is only used on POSIX.
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 86
84 const MojoCreateMessagePipeOptions 87 const MojoCreateMessagePipeOptions
85 MessagePipeDispatcher::kDefaultCreateOptions = { 88 MessagePipeDispatcher::kDefaultCreateOptions = {
86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
88 91
89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
90 const MojoCreateMessagePipeOptions* in_options, 93 const MojoCreateMessagePipeOptions* in_options,
91 MojoCreateMessagePipeOptions* out_options) { 94 MojoCreateMessagePipeOptions* out_options) {
92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE |
97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
94 98
95 *out_options = kDefaultCreateOptions; 99 *out_options = kDefaultCreateOptions;
96 if (!in_options) 100 if (!in_options)
97 return MOJO_RESULT_OK; 101 return MOJO_RESULT_OK;
98 102
99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
100 if (!reader.is_valid()) 104 if (!reader.is_valid())
101 return MOJO_RESULT_INVALID_ARGUMENT; 105 return MOJO_RESULT_INVALID_ARGUMENT;
102 106
103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
104 return MOJO_RESULT_OK; 108 return MOJO_RESULT_OK;
105 if ((reader.options().flags & ~kKnownFlags)) 109 if ((reader.options().flags & ~kKnownFlags))
106 return MOJO_RESULT_UNIMPLEMENTED; 110 return MOJO_RESULT_UNIMPLEMENTED;
107 out_options->flags = reader.options().flags; 111 out_options->flags = reader.options().flags;
108 112
109 // Checks for fields beyond |flags|: 113 // Checks for fields beyond |flags|:
110 114
111 // (Nothing here yet.) 115 // (Nothing here yet.)
112 116
113 return MOJO_RESULT_OK; 117 return MOJO_RESULT_OK;
114 } 118 }
115 119
116 void MessagePipeDispatcher::Init( 120 void MessagePipeDispatcher::Init(
117 ScopedPlatformHandle message_pipe, 121 ScopedPlatformHandle message_pipe,
118 char* serialized_read_buffer, size_t serialized_read_buffer_size, 122 char* serialized_read_buffer, size_t serialized_read_buffer_size,
119 char* serialized_write_buffer, size_t serialized_write_buffer_size, 123 char* serialized_write_buffer, size_t serialized_write_buffer_size,
120 std::vector<int>* serialized_read_fds, 124 std::vector<int>* serialized_read_fds,
121 std::vector<int>* serialized_write_fds) { 125 std::vector<int>* serialized_write_fds) {
126 CHECK(transferable_);
122 if (message_pipe.get().is_valid()) { 127 if (message_pipe.get().is_valid()) {
123 channel_ = RawChannel::Create(message_pipe.Pass()); 128 channel_ = RawChannel::Create(message_pipe.Pass());
124 129
125 // TODO(jam): It's probably cleaner to pass this in Init call. 130 // TODO(jam): It's probably cleaner to pass this in Init call.
126 channel_->SetSerializedData( 131 channel_->SetSerializedData(
127 serialized_read_buffer, serialized_read_buffer_size, 132 serialized_read_buffer, serialized_read_buffer_size,
128 serialized_write_buffer, serialized_write_buffer_size, 133 serialized_write_buffer, serialized_write_buffer_size,
129 serialized_read_fds, serialized_write_fds); 134 serialized_read_fds, serialized_write_fds);
130 internal::g_io_thread_task_runner->PostTask( 135 internal::g_io_thread_task_runner->PostTask(
131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
132 } 137 }
133 } 138 }
134 139
140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
141 CHECK(!transferable_);
142 pipe_id_ = pipe_id;
143 }
144
135 void MessagePipeDispatcher::InitOnIO() { 145 void MessagePipeDispatcher::InitOnIO() {
136 base::AutoLock locker(lock()); 146 base::AutoLock locker(lock());
147 CHECK(transferable_);
137 calling_init_ = true; 148 calling_init_ = true;
138 if (channel_) 149 if (channel_)
139 channel_->Init(this); 150 channel_->Init(this);
140 calling_init_ = false; 151 calling_init_ = false;
141 } 152 }
142 153
143 void MessagePipeDispatcher::CloseOnIO() { 154 void MessagePipeDispatcher::CloseOnIO() {
144 base::AutoLock locker(lock()); 155 base::AutoLock locker(lock());
145 156 if (transferable_) {
146 if (channel_) { 157 if (channel_) {
147 channel_->Shutdown(); 158 channel_->Shutdown();
148 channel_ = nullptr; 159 channel_ = nullptr;
160 }
161 } else {
162 if (non_transferable_state_ == CONNECT_CALLED) {
163 // We can't cancel the pending request yet, since the other side of the
164 // message pipe would want to get pending outgoing messages (if any) or
165 // at least know that this end was closed. So keep this object alive until
166 // then.
167 AddRef();
168 } else if (non_transferable_state_ == CONNECTED) {
169 internal::g_broker->CloseMessagePipe(pipe_id_, this);
170 non_transferable_state_ = CLOSED;
171 channel_ = nullptr;
172 }
149 } 173 }
150 } 174 }
151 175
152 Dispatcher::Type MessagePipeDispatcher::GetType() const { 176 Dispatcher::Type MessagePipeDispatcher::GetType() const {
153 return Type::MESSAGE_PIPE; 177 return Type::MESSAGE_PIPE;
154 } 178 }
155 179
180 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
181 base::AutoLock locker(lock());
182 channel_ = channel;
183 while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
184 channel_->WriteMessage(
185 non_transferable_outgoing_message_queue_.GetMessage());
186 }
187
188 if (is_closed()) {
189 CHECK_EQ(non_transferable_state_, CONNECT_CALLED);
190 // We kept this object alive until it's connected, we can release it now.
191 // Since we're in a callback from the Broker, call it asynchronously.
192 internal::g_io_thread_task_runner->PostTask(
193 FROM_HERE,
194 base::Bind(&Broker::CloseMessagePipe,
195 base::Unretained(internal::g_broker), pipe_id_,
196 base::Unretained(this)));
197 non_transferable_state_ = CLOSED;
198 channel_ = nullptr;
199 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
200 } else {
201 non_transferable_state_ = CONNECTED;
202 }
203 }
204
156 #if defined(OS_WIN) 205 #if defined(OS_WIN)
157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the 206 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
158 // best way we want to share this. 207 // best way we want to share this.
159 // Since this is used for serialization of messages read/written to a MP that 208 // Since this is used for serialization of messages read/written to a MP that
160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of 209 // aren't consumed by Mojo primitives yet, there could be an unbounded number of
161 // them when a MP is being sent. As a result, even for POSIX we will probably 210 // them when a MP is being sent. As a result, even for POSIX we will probably
162 // want to send the handles to the shell process and exchange them for tokens 211 // want to send the handles to the shell process and exchange them for tokens
163 // (since we can be sure that the shell will respond to our IPCs, compared to 212 // (since we can be sure that the shell will respond to our IPCs, compared to
164 // the other end where we're sending the MP to, which may not be reading...). 213 // the other end where we're sending the MP to, which may not be reading...).
165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( 214 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
(...skipping 13 matching lines...) Expand all
179 const void* source, 228 const void* source,
180 size_t size, 229 size_t size,
181 PlatformHandleVector* platform_handles) { 230 PlatformHandleVector* platform_handles) {
182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { 231 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) {
183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; 232 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)";
184 return nullptr; 233 return nullptr;
185 } 234 }
186 235
187 const SerializedMessagePipeHandleDispatcher* serialization = 236 const SerializedMessagePipeHandleDispatcher* serialization =
188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); 237 static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
238
239 scoped_refptr<MessagePipeDispatcher> rv(
240 new MessagePipeDispatcher(serialization->transferable));
241 if (!rv->transferable_) {
242 rv->InitNonTransferable(serialization->pipe_id);
243 return rv;
244 }
245
189 if (serialization->shared_memory_size != 246 if (serialization->shared_memory_size !=
190 (serialization->serialized_read_buffer_size + 247 (serialization->serialized_read_buffer_size +
191 serialization->serialized_write_buffer_size + 248 serialization->serialized_write_buffer_size +
192 serialization->serialized_message_queue_size)) { 249 serialization->serialized_message_queue_size)) {
193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; 250 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)";
194 return nullptr; 251 return nullptr;
195 } 252 }
196 253
197 ScopedPlatformHandle platform_handle, shared_memory_handle; 254 ScopedPlatformHandle platform_handle, shared_memory_handle;
198 if (!GetHandle(serialization->platform_handle_index, 255 if (!GetHandle(serialization->platform_handle_index,
(...skipping 27 matching lines...) Expand all
226 serialization->serialized_write_buffer_size; 283 serialization->serialized_write_buffer_size;
227 buffer += serialized_write_buffer_size; 284 buffer += serialized_write_buffer_size;
228 } 285 }
229 if (serialization->serialized_message_queue_size) { 286 if (serialization->serialized_message_queue_size) {
230 message_queue_data = buffer; 287 message_queue_data = buffer;
231 message_queue_size = serialization->serialized_message_queue_size; 288 message_queue_size = serialization->serialized_message_queue_size;
232 buffer += message_queue_size; 289 buffer += message_queue_size;
233 } 290 }
234 } 291 }
235 292
236 scoped_refptr<MessagePipeDispatcher> rv(
237 Create(MessagePipeDispatcher::kDefaultCreateOptions));
238 rv->write_error_ = serialization->write_error; 293 rv->write_error_ = serialization->write_error;
239 294
240 std::vector<int> serialized_read_fds; 295 std::vector<int> serialized_read_fds;
241 std::vector<int> serialized_write_fds; 296 std::vector<int> serialized_write_fds;
242 #if defined(OS_POSIX) 297 #if defined(OS_POSIX)
243 std::vector<int> serialized_fds; 298 std::vector<int> serialized_fds;
244 size_t serialized_fds_index = 0; 299 size_t serialized_fds_index = 0;
245 300
246 size_t total_fd_count = serialization->serialized_read_fds_length + 301 size_t total_fd_count = serialization->serialized_read_fds_length +
247 serialization->serialized_write_fds_length + 302 serialization->serialized_write_fds_length +
(...skipping 14 matching lines...) Expand all
262 serialized_fds_index += serialization->serialized_read_fds_length; 317 serialized_fds_index += serialization->serialized_read_fds_length;
263 serialized_write_fds.assign( 318 serialized_write_fds.assign(
264 serialized_fds.begin() + serialized_fds_index, 319 serialized_fds.begin() + serialized_fds_index,
265 serialized_fds.begin() + serialized_fds_index + 320 serialized_fds.begin() + serialized_fds_index +
266 serialization->serialized_write_fds_length); 321 serialization->serialized_write_fds_length);
267 serialized_fds_index += serialization->serialized_write_fds_length; 322 serialized_fds_index += serialization->serialized_write_fds_length;
268 #endif 323 #endif
269 324
270 while (message_queue_size) { 325 while (message_queue_size) {
271 size_t message_size; 326 size_t message_size;
272 CHECK(MessageInTransit::GetNextMessageSize( 327 if (!MessageInTransit::GetNextMessageSize(
273 message_queue_data, message_queue_size, &message_size)); 328 message_queue_data, message_queue_size, &message_size)) {
329 NOTREACHED() << "Couldn't read message size from serialized data.";
330 return nullptr;
331 }
332 if (message_size > message_queue_size) {
333 NOTREACHED() << "Invalid serialized message size.";
334 return nullptr;
335 }
274 MessageInTransit::View message_view(message_size, message_queue_data); 336 MessageInTransit::View message_view(message_size, message_queue_data);
275 message_queue_size -= message_size; 337 message_queue_size -= message_size;
276 message_queue_data += message_size; 338 message_queue_data += message_size;
277 339
278 // TODO(jam): Copied below from RawChannelWin. See commment above 340 // TODO(jam): Copied below from RawChannelWin. See commment above
279 // GetReadPlatformHandles. 341 // GetReadPlatformHandles.
280 ScopedPlatformHandleVectorPtr temp_platform_handles; 342 ScopedPlatformHandleVectorPtr temp_platform_handles;
281 if (message_view.transport_data_buffer()) { 343 if (message_view.transport_data_buffer()) {
282 size_t num_platform_handles; 344 size_t num_platform_handles;
283 const void* platform_handle_table; 345 const void* platform_handle_table;
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 &serialized_write_fds); 388 &serialized_write_fds);
327 389
328 if (message_queue_size) { // Should be empty by now. 390 if (message_queue_size) { // Should be empty by now.
329 LOG(ERROR) << "Invalid queued messages"; 391 LOG(ERROR) << "Invalid queued messages";
330 return nullptr; 392 return nullptr;
331 } 393 }
332 394
333 return rv; 395 return rv;
334 } 396 }
335 397
336 MessagePipeDispatcher::MessagePipeDispatcher() 398 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
337 : channel_(nullptr), 399 : channel_(nullptr),
338 serialized_(false),
339 serialized_read_fds_length_(0u), 400 serialized_read_fds_length_(0u),
340 serialized_write_fds_length_(0u), 401 serialized_write_fds_length_(0u),
341 serialized_message_fds_length_(0u), 402 serialized_message_fds_length_(0u),
403 pipe_id_(0),
404 non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
405 serialized_(false),
342 calling_init_(false), 406 calling_init_(false),
343 write_error_(false) { 407 write_error_(false),
408 transferable_(transferable) {
344 } 409 }
345 410
346 MessagePipeDispatcher::~MessagePipeDispatcher() { 411 MessagePipeDispatcher::~MessagePipeDispatcher() {
347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The 412 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
348 // exception is if they posted a task to run CloseOnIO but the IO thread shut 413 // exception is if they posted a task to run CloseOnIO but the IO thread shut
349 // down and so when it was deleting pending tasks it caused the last reference 414 // down and so when it was deleting pending tasks it caused the last reference
350 // to destruct this object. In that case, safe to destroy the channel. 415 // to destruct this object. In that case, safe to destroy the channel.
351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) 416 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
352 channel_->Shutdown(); 417 channel_->Shutdown();
353 else 418 else
354 DCHECK(!channel_); 419 DCHECK(!channel_);
355 #if defined(OS_POSIX) 420 #if defined(OS_POSIX)
356 ClosePlatformHandles(&serialized_fds_); 421 ClosePlatformHandles(&serialized_fds_);
357 #endif 422 #endif
358 } 423 }
359 424
360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { 425 void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
361 lock().AssertAcquired(); 426 lock().AssertAcquired();
362 awakable_list_.CancelAll(); 427 awakable_list_.CancelAll();
363 } 428 }
364 429
365 void MessagePipeDispatcher::CloseImplNoLock() { 430 void MessagePipeDispatcher::CloseImplNoLock() {
366 lock().AssertAcquired(); 431 lock().AssertAcquired();
367 internal::g_io_thread_task_runner->PostTask( 432 internal::g_io_thread_task_runner->PostTask(
368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); 433 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
369 } 434 }
370 435
371 void MessagePipeDispatcher::SerializeInternal() { 436 void MessagePipeDispatcher::SerializeInternal() {
437 serialized_ = true;
438 if (!transferable_) {
439 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
440 << "Non transferable message pipe being sent after read/write. "
441 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
442 << "the pipe can be sent after it's read or written.";
443 return;
444 }
445
372 // We need to stop watching handle immediately, even though not on IO thread, 446 // We need to stop watching handle immediately, even though not on IO thread,
373 // so that other messages aren't read after this. 447 // so that other messages aren't read after this.
374 std::vector<int> serialized_read_fds, serialized_write_fds; 448 std::vector<int> serialized_read_fds, serialized_write_fds;
375 if (channel_) { 449 if (channel_) {
376 bool write_error = false; 450 bool write_error = false;
377 451
378 serialized_platform_handle_ = channel_->ReleaseHandle( 452 serialized_platform_handle_ = channel_->ReleaseHandle(
379 &serialized_read_buffer_, &serialized_write_buffer_, 453 &serialized_read_buffer_, &serialized_write_buffer_,
380 &serialized_read_fds, &serialized_write_fds, &write_error); 454 &serialized_read_fds, &serialized_write_fds, &write_error);
381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), 455 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(),
382 serialized_read_fds.end()); 456 serialized_read_fds.end());
383 serialized_read_fds_length_ = serialized_read_fds.size(); 457 serialized_read_fds_length_ = serialized_read_fds.size();
384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), 458 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(),
385 serialized_write_fds.end()); 459 serialized_write_fds.end());
386 serialized_write_fds_length_ = serialized_write_fds.size(); 460 serialized_write_fds_length_ = serialized_write_fds.size();
387 channel_ = nullptr; 461 channel_ = nullptr;
388 if (write_error)
389 write_error = true;
390 } else { 462 } else {
391 // It's valid that the other side wrote some data and closed its end. 463 // It's valid that the other side wrote some data and closed its end.
392 } 464 }
393 465
394 DCHECK(serialized_message_queue_.empty()); 466 DCHECK(serialized_message_queue_.empty());
395 while (!message_queue_.IsEmpty()) { 467 while (!message_queue_.IsEmpty()) {
396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); 468 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
397 469
398 // When MojoWriteMessage is called, the MessageInTransit doesn't have 470 // When MojoWriteMessage is called, the MessageInTransit doesn't have
399 // dispatchers set and CreateEquivaent... is called since the dispatchers 471 // dispatchers set and CreateEquivaent... is called since the dispatchers
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
434 &all_platform_handles->at(0), all_platform_handles->size(), tokens); 506 &all_platform_handles->at(0), all_platform_handles->size(), tokens);
435 for (size_t i = 0; i < all_platform_handles->size(); i++) 507 for (size_t i = 0; i < all_platform_handles->size(); i++)
436 all_platform_handles->at(i) = PlatformHandle(); 508 all_platform_handles->at(i) = PlatformHandle();
437 #else 509 #else
438 for (size_t i = 0; i < all_platform_handles->size(); i++) { 510 for (size_t i = 0; i < all_platform_handles->size(); i++) {
439 serialized_fds_.push_back(all_platform_handles->at(i).fd); 511 serialized_fds_.push_back(all_platform_handles->at(i).fd);
440 serialized_message_fds_length_++; 512 serialized_message_fds_length_++;
441 all_platform_handles->at(i) = PlatformHandle(); 513 all_platform_handles->at(i) = PlatformHandle();
442 } 514 }
443 #endif 515 #endif
516 }
444 517
445 serialized_message_queue_.insert( 518 serialized_message_queue_.insert(
446 serialized_message_queue_.end(), 519 serialized_message_queue_.end(),
447 static_cast<const char*>(message->transport_data()->buffer()), 520 static_cast<const char*>(message->transport_data()->buffer()),
448 static_cast<const char*>(message->transport_data()->buffer()) + 521 static_cast<const char*>(message->transport_data()->buffer()) +
449 transport_data_buffer_size); 522 transport_data_buffer_size);
450 }
451 } 523 }
452 524
453 for (size_t i = 0; i < dispatchers.size(); ++i) 525 for (size_t i = 0; i < dispatchers.size(); ++i)
454 dispatchers[i]->TransportEnded(); 526 dispatchers[i]->TransportEnded();
455 } 527 }
456
457 serialized_ = true;
458 } 528 }
459 529
460 scoped_refptr<Dispatcher> 530 scoped_refptr<Dispatcher>
461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 531 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
462 lock().AssertAcquired(); 532 lock().AssertAcquired();
463 533
464 SerializeInternal(); 534 SerializeInternal();
465 535
466 // TODO(vtl): Currently, there are no options, so we just use 536 // TODO(vtl): Currently, there are no options, so we just use
467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options 537 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
468 // too. 538 // too.
469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); 539 scoped_refptr<MessagePipeDispatcher> rv(
470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); 540 new MessagePipeDispatcher(transferable_));
471 serialized_message_queue_.swap(rv->serialized_message_queue_);
472 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
473 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
474 serialized_fds_.swap(rv->serialized_fds_);
475 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
476 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
477 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
478 rv->serialized_ = true; 541 rv->serialized_ = true;
479 rv->write_error_ = write_error_; 542 if (transferable_) {
480 return scoped_refptr<Dispatcher>(rv.get()); 543 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
544 serialized_message_queue_.swap(rv->serialized_message_queue_);
545 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
546 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
547 serialized_fds_.swap(rv->serialized_fds_);
548 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
549 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
550 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
551 rv->write_error_ = write_error_;
552 } else {
553 rv->pipe_id_ = pipe_id_;
554 rv->non_transferable_state_ = non_transferable_state_;
555 }
556 return rv;
481 } 557 }
482 558
483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( 559 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
484 const void* bytes, 560 const void* bytes,
485 uint32_t num_bytes, 561 uint32_t num_bytes,
486 std::vector<DispatcherTransport>* transports, 562 std::vector<DispatcherTransport>* transports,
487 MojoWriteMessageFlags flags) { 563 MojoWriteMessageFlags flags) {
564 lock().AssertAcquired();
488 565
489 DCHECK(!transports || 566 DCHECK(!transports ||
490 (transports->size() > 0 && 567 (transports->size() > 0 &&
491 transports->size() <= GetConfiguration().max_message_num_handles)); 568 transports->size() <= GetConfiguration().max_message_num_handles));
492 569
493 lock().AssertAcquired(); 570 if (write_error_ ||
494 571 (transferable_ && !channel_) ||
495 if (!channel_ || write_error_) 572 (!transferable_ && non_transferable_state_ == CLOSED)) {
496 return MOJO_RESULT_FAILED_PRECONDITION; 573 return MOJO_RESULT_FAILED_PRECONDITION;
574 }
497 575
498 if (num_bytes > GetConfiguration().max_message_num_bytes) 576 if (num_bytes > GetConfiguration().max_message_num_bytes)
499 return MOJO_RESULT_RESOURCE_EXHAUSTED; 577 return MOJO_RESULT_RESOURCE_EXHAUSTED;
500 scoped_ptr<MessageInTransit> message(new MessageInTransit( 578 scoped_ptr<MessageInTransit> message(new MessageInTransit(
501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); 579 MessageInTransit::Type::MESSAGE, num_bytes, bytes));
502 if (transports) { 580 if (transports) {
503 MojoResult result = AttachTransportsNoLock(message.get(), transports); 581 MojoResult result = AttachTransportsNoLock(message.get(), transports);
504 if (result != MOJO_RESULT_OK) 582 if (result != MOJO_RESULT_OK)
505 return result; 583 return result;
506 } 584 }
507 585
508 message->SerializeAndCloseDispatchers(); 586 message->SerializeAndCloseDispatchers();
509 channel_->WriteMessage(message.Pass()); 587 if (!transferable_)
588 message->set_route_id(pipe_id_);
589 if (!transferable_ &&
590 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
591 non_transferable_state_ == CONNECT_CALLED)) {
592 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
593 RequestNontransferableChannel();
594 non_transferable_outgoing_message_queue_.AddMessage(message.Pass());
595 } else {
596 channel_->WriteMessage(message.Pass());
597 }
510 598
511 return MOJO_RESULT_OK; 599 return MOJO_RESULT_OK;
512 } 600 }
513 601
514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( 602 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
515 void* bytes, 603 void* bytes,
516 uint32_t* num_bytes, 604 uint32_t* num_bytes,
517 DispatcherVector* dispatchers, 605 DispatcherVector* dispatchers,
518 uint32_t* num_dispatchers, 606 uint32_t* num_dispatchers,
519 MojoReadMessageFlags flags) { 607 MojoReadMessageFlags flags) {
520 lock().AssertAcquired(); 608 lock().AssertAcquired();
521 if (channel_) 609 if (channel_) {
522 channel_->EnsureLazyInitialized(); 610 channel_->EnsureLazyInitialized();
611 } else if (!transferable_) {
612 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
613 RequestNontransferableChannel();
614 return MOJO_RESULT_SHOULD_WAIT;
615 } else if (non_transferable_state_ == CONNECT_CALLED) {
616 return MOJO_RESULT_SHOULD_WAIT;
617 }
618 }
619
523 DCHECK(!dispatchers || dispatchers->empty()); 620 DCHECK(!dispatchers || dispatchers->empty());
524 621
525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; 622 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; 623 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
527 624
528 if (message_queue_.IsEmpty()) 625 if (message_queue_.IsEmpty())
529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; 626 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION;
530 627
531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop 628 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
532 // and release the lock immediately. 629 // and release the lock immediately.
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 return MOJO_RESULT_OK; 673 return MOJO_RESULT_OK;
577 } 674 }
578 675
579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() 676 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
580 const { 677 const {
581 lock().AssertAcquired(); 678 lock().AssertAcquired();
582 679
583 HandleSignalsState rv; 680 HandleSignalsState rv;
584 if (!message_queue_.IsEmpty()) 681 if (!message_queue_.IsEmpty())
585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 682 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
586 if (channel_ || !message_queue_.IsEmpty()) 683 if (!message_queue_.IsEmpty() ||
684 (transferable_ && channel_) ||
685 (!transferable_ && non_transferable_state_ != CLOSED))
587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 686 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
588 if (channel_ && !write_error_) { 687 if (!write_error_ &&
688 ((transferable_ && channel_) ||
689 (!transferable_ && non_transferable_state_ != CLOSED))) {
589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 690 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 691 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
591 } 692 }
592 if (!channel_ || write_error_) 693 if (write_error_ ||
694 (transferable_ && !channel_) ||
695 (!transferable_ && non_transferable_state_ == CLOSED)) {
593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 696 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
697 }
594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 698 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
595 return rv; 699 return rv;
596 } 700 }
597 701
598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( 702 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
599 Awakable* awakable, 703 Awakable* awakable,
600 MojoHandleSignals signals, 704 MojoHandleSignals signals,
601 uintptr_t context, 705 uintptr_t context,
602 HandleSignalsState* signals_state) { 706 HandleSignalsState* signals_state) {
603 lock().AssertAcquired(); 707 lock().AssertAcquired();
604 if (channel_) 708 if (channel_) {
605 channel_->EnsureLazyInitialized(); 709 channel_->EnsureLazyInitialized();
710 } else if (!transferable_ &&
711 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
712 RequestNontransferableChannel();
713 }
714
606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 715 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
607 if (state.satisfies(signals)) { 716 if (state.satisfies(signals)) {
608 if (signals_state) 717 if (signals_state)
609 *signals_state = state; 718 *signals_state = state;
610 return MOJO_RESULT_ALREADY_EXISTS; 719 return MOJO_RESULT_ALREADY_EXISTS;
611 } 720 }
612 if (!state.can_satisfy(signals)) { 721 if (!state.can_satisfy(signals)) {
613 if (signals_state) 722 if (signals_state)
614 *signals_state = state; 723 *signals_state = state;
615 return MOJO_RESULT_FAILED_PRECONDITION; 724 return MOJO_RESULT_FAILED_PRECONDITION;
(...skipping 30 matching lines...) Expand all
646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); 755 *max_size = sizeof(SerializedMessagePipeHandleDispatcher);
647 } 756 }
648 757
649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( 758 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
650 void* destination, 759 void* destination,
651 size_t* actual_size, 760 size_t* actual_size,
652 PlatformHandleVector* platform_handles) { 761 PlatformHandleVector* platform_handles) {
653 CloseImplNoLock(); 762 CloseImplNoLock();
654 SerializedMessagePipeHandleDispatcher* serialization = 763 SerializedMessagePipeHandleDispatcher* serialization =
655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); 764 static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
765 serialization->transferable = transferable_;
766 serialization->pipe_id = pipe_id_;
656 if (serialized_platform_handle_.is_valid()) { 767 if (serialized_platform_handle_.is_valid()) {
657 serialization->platform_handle_index = platform_handles->size(); 768 serialization->platform_handle_index = platform_handles->size();
658 platform_handles->push_back(serialized_platform_handle_.release()); 769 platform_handles->push_back(serialized_platform_handle_.release());
659 } else { 770 } else {
660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; 771 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
661 } 772 }
662 773
663 serialization->write_error = write_error_; 774 serialization->write_error = write_error_;
664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); 775 serialization->serialized_read_buffer_size = serialized_read_buffer_.size();
665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); 776 serialization->serialized_write_buffer_size = serialized_write_buffer_.size();
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
789 break; 900 break;
790 } 901 }
791 902
792 if (started_transport_.Try()) { 903 if (started_transport_.Try()) {
793 base::AutoLock locker(lock()); 904 base::AutoLock locker(lock());
794 // We can get two OnError callbacks before the post task below completes. 905 // We can get two OnError callbacks before the post task below completes.
795 // Although RawChannel still has a pointer to this object until Shutdown is 906 // Although RawChannel still has a pointer to this object until Shutdown is
796 // called, that is safe since this class always does a PostTask to the IO 907 // called, that is safe since this class always does a PostTask to the IO
797 // thread to self destruct. 908 // thread to self destruct.
798 if (channel_ && error != ERROR_WRITE) { 909 if (channel_ && error != ERROR_WRITE) {
799 channel_->Shutdown(); 910 if (transferable_) {
911 channel_->Shutdown();
912 } else {
913 CHECK_NE(non_transferable_state_, CLOSED);
914 // Since we're in a callback from the Broker, call it asynchronously.
915 internal::g_io_thread_task_runner->PostTask(
916 FROM_HERE,
917 base::Bind(&Broker::CloseMessagePipe,
918 base::Unretained(internal::g_broker), pipe_id_,
919 base::Unretained(this)));
920 non_transferable_state_ = CLOSED;
921 }
800 channel_ = nullptr; 922 channel_ = nullptr;
801 } 923 }
802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 924 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
803 started_transport_.Release(); 925 started_transport_.Release();
804 } else { 926 } else {
805 // We must be waiting to call ReleaseHandle. It will call Shutdown. 927 // We must be waiting to call ReleaseHandle. It will call Shutdown.
806 } 928 }
807 } 929 }
808 930
809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( 931 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
810 MessageInTransit* message, 932 MessageInTransit* message,
811 std::vector<DispatcherTransport>* transports) { 933 std::vector<DispatcherTransport>* transports) {
812 DCHECK(!message->has_dispatchers()); 934 DCHECK(!message->has_dispatchers());
813 935
814 // You're not allowed to send either handle to a message pipe over the message 936 // You're not allowed to send either handle to a message pipe over the message
815 // pipe, so check for this. (The case of trying to write a handle to itself is 937 // pipe, so check for this. (The case of trying to write a handle to itself is
816 // taken care of by |Core|. That case kind of makes sense, but leads to 938 // taken care of by |Core|. That case kind of makes sense, but leads to
817 // complications if, e.g., both sides try to do the same thing with their 939 // complications if, e.g., both sides try to do the same thing with their
818 // respective handles simultaneously. The other case, of trying to write the 940 // respective handles simultaneously. The other case, of trying to write the
819 // peer handle to a handle, doesn't make sense -- since no handle will be 941 // peer handle to a handle, doesn't make sense -- since no handle will be
820 // available to read the message from.) 942 // available to read the message from.)
821 for (size_t i = 0; i < transports->size(); i++) { 943 for (size_t i = 0; i < transports->size(); i++) {
822 if (!(*transports)[i].is_valid()) 944 if (!(*transports)[i].is_valid())
823 continue; 945 continue;
824 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { 946 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
825 MessagePipeDispatcher* mp = 947 MessagePipeDispatcher* mp =
826 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); 948 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
827 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { 949 if (transferable_ && mp->transferable_ &&
950 channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
828 // The other case should have been disallowed by |Core|. (Note: |port| 951 // The other case should have been disallowed by |Core|. (Note: |port|
829 // is the peer port of the handle given to |WriteMessage()|.) 952 // is the peer port of the handle given to |WriteMessage()|.)
830 return MOJO_RESULT_INVALID_ARGUMENT; 953 return MOJO_RESULT_INVALID_ARGUMENT;
954 } else if (!transferable_ && !mp->transferable_ &&
955 pipe_id_ == mp->pipe_id_) {
956 return MOJO_RESULT_INVALID_ARGUMENT;
831 } 957 }
832 } 958 }
833 } 959 }
834 960
835 // Clone the dispatchers and attach them to the message. (This must be done as 961 // Clone the dispatchers and attach them to the message. (This must be done as
836 // a separate loop, since we want to leave the dispatchers alone on failure.) 962 // a separate loop, since we want to leave the dispatchers alone on failure.)
837 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); 963 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
838 dispatchers->reserve(transports->size()); 964 dispatchers->reserve(transports->size());
839 for (size_t i = 0; i < transports->size(); i++) { 965 for (size_t i = 0; i < transports->size(); i++) {
840 if ((*transports)[i].is_valid()) { 966 if ((*transports)[i].is_valid()) {
841 dispatchers->push_back( 967 dispatchers->push_back(
842 (*transports)[i].CreateEquivalentDispatcherAndClose()); 968 (*transports)[i].CreateEquivalentDispatcherAndClose());
843 } else { 969 } else {
844 LOG(WARNING) << "Enqueueing null dispatcher"; 970 LOG(WARNING) << "Enqueueing null dispatcher";
845 dispatchers->push_back(nullptr); 971 dispatchers->push_back(nullptr);
846 } 972 }
847 } 973 }
848 message->SetDispatchers(dispatchers.Pass()); 974 message->SetDispatchers(dispatchers.Pass());
849 return MOJO_RESULT_OK; 975 return MOJO_RESULT_OK;
850 } 976 }
851 977
978 void MessagePipeDispatcher::RequestNontransferableChannel() {
979 lock().AssertAcquired();
980 CHECK(!transferable_);
981 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
982 non_transferable_state_ = CONNECT_CALLED;
983
984 // PostTask since the broker can call us back synchronously.
985 internal::g_io_thread_task_runner->PostTask(
986 FROM_HERE,
987 base::Bind(&Broker::ConnectMessagePipe,
988 base::Unretained(internal::g_broker), pipe_id_,
989 base::Unretained(this)));
990 }
991
852 } // namespace edk 992 } // namespace edk
853 } // namespace mojo 993 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698