Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/logging.h" | 8 #include "base/logging.h" |
| 9 #include "base/message_loop/message_loop.h" | 9 #include "base/message_loop/message_loop.h" |
| 10 #include "mojo/edk/embedder/embedder_internal.h" | 10 #include "mojo/edk/embedder/embedder_internal.h" |
| 11 #include "mojo/edk/embedder/platform_handle_utils.h" | 11 #include "mojo/edk/embedder/platform_handle_utils.h" |
| 12 #include "mojo/edk/embedder/platform_shared_buffer.h" | 12 #include "mojo/edk/embedder/platform_shared_buffer.h" |
| 13 #include "mojo/edk/embedder/platform_support.h" | 13 #include "mojo/edk/embedder/platform_support.h" |
| 14 #include "mojo/edk/system/broker.h" | 14 #include "mojo/edk/system/broker.h" |
| 15 #include "mojo/edk/system/configuration.h" | 15 #include "mojo/edk/system/configuration.h" |
| 16 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
| 17 #include "mojo/edk/system/options_validation.h" | 17 #include "mojo/edk/system/options_validation.h" |
| 18 #include "mojo/edk/system/transport_data.h" | 18 #include "mojo/edk/system/transport_data.h" |
| 19 | 19 |
| 20 namespace mojo { | 20 namespace mojo { |
| 21 namespace edk { | 21 namespace edk { |
| 22 | 22 |
| 23 namespace { | 23 namespace { |
| 24 | 24 |
| 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); | 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
| 26 | 26 |
| 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { | 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
| 28 bool transferable; | |
| 29 bool write_error; | |
| 30 uint64_t pipe_id; // If transferable is false. | |
| 31 // The following members are only set if transferable is true. | |
| 28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP | 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
| 29 // was closed. | 33 // was closed. |
| 30 size_t platform_handle_index; | 34 size_t platform_handle_index; |
| 31 bool write_error; | |
| 32 | 35 |
| 33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) | 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
| 34 uint32_t shared_memory_size; | 37 uint32_t shared_memory_size; |
| 35 | 38 |
| 36 size_t serialized_read_buffer_size; | 39 size_t serialized_read_buffer_size; |
| 37 size_t serialized_write_buffer_size; | 40 size_t serialized_write_buffer_size; |
| 38 size_t serialized_message_queue_size; | 41 size_t serialized_message_queue_size; |
| 39 | 42 |
| 40 // These are the FDs required as part of serializing channel_ and | 43 // These are the FDs required as part of serializing channel_ and |
| 41 // message_queue_. This is only used on POSIX. | 44 // message_queue_. This is only used on POSIX. |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 83 | 86 |
| 84 const MojoCreateMessagePipeOptions | 87 const MojoCreateMessagePipeOptions |
| 85 MessagePipeDispatcher::kDefaultCreateOptions = { | 88 MessagePipeDispatcher::kDefaultCreateOptions = { |
| 86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
| 87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
| 88 | 91 |
| 89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
| 90 const MojoCreateMessagePipeOptions* in_options, | 93 const MojoCreateMessagePipeOptions* in_options, |
| 91 MojoCreateMessagePipeOptions* out_options) { | 94 MojoCreateMessagePipeOptions* out_options) { |
| 92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
| 93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; | 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
| 97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; | |
| 94 | 98 |
| 95 *out_options = kDefaultCreateOptions; | 99 *out_options = kDefaultCreateOptions; |
| 96 if (!in_options) | 100 if (!in_options) |
| 97 return MOJO_RESULT_OK; | 101 return MOJO_RESULT_OK; |
| 98 | 102 |
| 99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
| 100 if (!reader.is_valid()) | 104 if (!reader.is_valid()) |
| 101 return MOJO_RESULT_INVALID_ARGUMENT; | 105 return MOJO_RESULT_INVALID_ARGUMENT; |
| 102 | 106 |
| 103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
| 104 return MOJO_RESULT_OK; | 108 return MOJO_RESULT_OK; |
| 105 if ((reader.options().flags & ~kKnownFlags)) | 109 if ((reader.options().flags & ~kKnownFlags)) |
| 106 return MOJO_RESULT_UNIMPLEMENTED; | 110 return MOJO_RESULT_UNIMPLEMENTED; |
| 107 out_options->flags = reader.options().flags; | 111 out_options->flags = reader.options().flags; |
| 108 | 112 |
| 109 // Checks for fields beyond |flags|: | 113 // Checks for fields beyond |flags|: |
| 110 | 114 |
| 111 // (Nothing here yet.) | 115 // (Nothing here yet.) |
| 112 | 116 |
| 113 return MOJO_RESULT_OK; | 117 return MOJO_RESULT_OK; |
| 114 } | 118 } |
| 115 | 119 |
| 116 void MessagePipeDispatcher::Init( | 120 void MessagePipeDispatcher::Init( |
| 117 ScopedPlatformHandle message_pipe, | 121 ScopedPlatformHandle message_pipe, |
| 118 char* serialized_read_buffer, size_t serialized_read_buffer_size, | 122 char* serialized_read_buffer, size_t serialized_read_buffer_size, |
| 119 char* serialized_write_buffer, size_t serialized_write_buffer_size, | 123 char* serialized_write_buffer, size_t serialized_write_buffer_size, |
| 120 std::vector<int>* serialized_read_fds, | 124 std::vector<int>* serialized_read_fds, |
| 121 std::vector<int>* serialized_write_fds) { | 125 std::vector<int>* serialized_write_fds) { |
| 126 CHECK(transferable_); | |
| 122 if (message_pipe.get().is_valid()) { | 127 if (message_pipe.get().is_valid()) { |
| 123 channel_ = RawChannel::Create(message_pipe.Pass()); | 128 channel_ = RawChannel::Create(message_pipe.Pass()); |
| 124 | 129 |
| 125 // TODO(jam): It's probably cleaner to pass this in Init call. | 130 // TODO(jam): It's probably cleaner to pass this in Init call. |
| 126 channel_->SetSerializedData( | 131 channel_->SetSerializedData( |
| 127 serialized_read_buffer, serialized_read_buffer_size, | 132 serialized_read_buffer, serialized_read_buffer_size, |
| 128 serialized_write_buffer, serialized_write_buffer_size, | 133 serialized_write_buffer, serialized_write_buffer_size, |
| 129 serialized_read_fds, serialized_write_fds); | 134 serialized_read_fds, serialized_write_fds); |
| 130 internal::g_io_thread_task_runner->PostTask( | 135 internal::g_io_thread_task_runner->PostTask( |
| 131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
| 132 } | 137 } |
| 133 } | 138 } |
| 134 | 139 |
| 140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { | |
| 141 CHECK(!transferable_); | |
| 142 pipe_id_ = pipe_id; | |
| 143 } | |
| 144 | |
| 135 void MessagePipeDispatcher::InitOnIO() { | 145 void MessagePipeDispatcher::InitOnIO() { |
| 136 base::AutoLock locker(lock()); | 146 base::AutoLock locker(lock()); |
| 147 CHECK(transferable_); | |
| 137 calling_init_ = true; | 148 calling_init_ = true; |
| 138 if (channel_) | 149 if (channel_) |
| 139 channel_->Init(this); | 150 channel_->Init(this); |
| 140 calling_init_ = false; | 151 calling_init_ = false; |
| 141 } | 152 } |
| 142 | 153 |
| 143 void MessagePipeDispatcher::CloseOnIO() { | 154 void MessagePipeDispatcher::CloseOnIO() { |
| 144 base::AutoLock locker(lock()); | 155 base::AutoLock locker(lock()); |
| 145 | 156 if (transferable_) { |
| 146 if (channel_) { | 157 if (channel_) { |
| 147 channel_->Shutdown(); | 158 channel_->Shutdown(); |
| 148 channel_ = nullptr; | 159 channel_ = nullptr; |
| 160 } | |
| 161 } else { | |
| 162 if (non_transferable_state_ == CONNECT_CALLED) { | |
| 163 // We can't cancel the pending request yet, since the other side of the | |
| 164 // message pipe would want to get pending outgoing messages (if any) or | |
| 165 // at least know that this end was closed. So keep this object alive until | |
| 166 // then. | |
| 167 AddRef(); | |
| 168 } else if (non_transferable_state_ == CONNECTED) { | |
| 169 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
| 170 non_transferable_state_ = CLOSED; | |
| 171 channel_ = nullptr; | |
| 172 } | |
| 149 } | 173 } |
| 150 } | 174 } |
| 151 | 175 |
| 152 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 176 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| 153 return Type::MESSAGE_PIPE; | 177 return Type::MESSAGE_PIPE; |
| 154 } | 178 } |
| 155 | 179 |
| 180 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { | |
| 181 base::AutoLock locker(lock()); | |
| 182 channel_ = channel; | |
| 183 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { | |
| 184 channel_->WriteMessage( | |
| 185 non_transferable_outgoing_message_queue_.GetMessage()); | |
| 186 } | |
| 187 | |
| 188 if (is_closed()) { | |
| 189 CHECK_EQ(non_transferable_state_, CONNECT_CALLED); | |
| 190 // We kept this object alive until it's connected, we can release it now. | |
| 191 // Since we're in a callback from the Broker, call it asynchronously. | |
| 192 internal::g_io_thread_task_runner->PostTask( | |
| 193 FROM_HERE, | |
| 194 base::Bind(&Broker::CloseMessagePipe, | |
| 195 base::Unretained(internal::g_broker), pipe_id_, | |
| 196 base::Unretained(this))); | |
| 197 non_transferable_state_ = CLOSED; | |
| 198 channel_ = nullptr; | |
| 199 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | |
| 200 } else { | |
| 201 non_transferable_state_ = CONNECTED; | |
| 202 } | |
| 203 } | |
| 204 | |
| 156 #if defined(OS_WIN) | 205 #if defined(OS_WIN) |
| 157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 206 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
| 158 // best way we want to share this. | 207 // best way we want to share this. |
| 159 // Since this is used for serialization of messages read/written to a MP that | 208 // Since this is used for serialization of messages read/written to a MP that |
| 160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 209 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
| 161 // them when a MP is being sent. As a result, even for POSIX we will probably | 210 // them when a MP is being sent. As a result, even for POSIX we will probably |
| 162 // want to send the handles to the shell process and exchange them for tokens | 211 // want to send the handles to the shell process and exchange them for tokens |
| 163 // (since we can be sure that the shell will respond to our IPCs, compared to | 212 // (since we can be sure that the shell will respond to our IPCs, compared to |
| 164 // the other end where we're sending the MP to, which may not be reading...). | 213 // the other end where we're sending the MP to, which may not be reading...). |
| 165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | 214 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 179 const void* source, | 228 const void* source, |
| 180 size_t size, | 229 size_t size, |
| 181 PlatformHandleVector* platform_handles) { | 230 PlatformHandleVector* platform_handles) { |
| 182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { | 231 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { |
| 183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; | 232 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; |
| 184 return nullptr; | 233 return nullptr; |
| 185 } | 234 } |
| 186 | 235 |
| 187 const SerializedMessagePipeHandleDispatcher* serialization = | 236 const SerializedMessagePipeHandleDispatcher* serialization = |
| 188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); | 237 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
| 238 | |
| 239 scoped_refptr<MessagePipeDispatcher> rv( | |
| 240 new MessagePipeDispatcher(serialization->transferable)); | |
| 241 if (!rv->transferable_) { | |
| 242 rv->InitNonTransferable(serialization->pipe_id); | |
| 243 return rv; | |
| 244 } | |
| 245 | |
| 189 if (serialization->shared_memory_size != | 246 if (serialization->shared_memory_size != |
| 190 (serialization->serialized_read_buffer_size + | 247 (serialization->serialized_read_buffer_size + |
| 191 serialization->serialized_write_buffer_size + | 248 serialization->serialized_write_buffer_size + |
| 192 serialization->serialized_message_queue_size)) { | 249 serialization->serialized_message_queue_size)) { |
| 193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; | 250 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; |
| 194 return nullptr; | 251 return nullptr; |
| 195 } | 252 } |
| 196 | 253 |
| 197 ScopedPlatformHandle platform_handle, shared_memory_handle; | 254 ScopedPlatformHandle platform_handle, shared_memory_handle; |
| 198 if (!GetHandle(serialization->platform_handle_index, | 255 if (!GetHandle(serialization->platform_handle_index, |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 226 serialization->serialized_write_buffer_size; | 283 serialization->serialized_write_buffer_size; |
| 227 buffer += serialized_write_buffer_size; | 284 buffer += serialized_write_buffer_size; |
| 228 } | 285 } |
| 229 if (serialization->serialized_message_queue_size) { | 286 if (serialization->serialized_message_queue_size) { |
| 230 message_queue_data = buffer; | 287 message_queue_data = buffer; |
| 231 message_queue_size = serialization->serialized_message_queue_size; | 288 message_queue_size = serialization->serialized_message_queue_size; |
| 232 buffer += message_queue_size; | 289 buffer += message_queue_size; |
| 233 } | 290 } |
| 234 } | 291 } |
| 235 | 292 |
| 236 scoped_refptr<MessagePipeDispatcher> rv( | |
| 237 Create(MessagePipeDispatcher::kDefaultCreateOptions)); | |
| 238 rv->write_error_ = serialization->write_error; | 293 rv->write_error_ = serialization->write_error; |
| 239 | 294 |
| 240 std::vector<int> serialized_read_fds; | 295 std::vector<int> serialized_read_fds; |
| 241 std::vector<int> serialized_write_fds; | 296 std::vector<int> serialized_write_fds; |
| 242 #if defined(OS_POSIX) | 297 #if defined(OS_POSIX) |
| 243 std::vector<int> serialized_fds; | 298 std::vector<int> serialized_fds; |
| 244 size_t serialized_fds_index = 0; | 299 size_t serialized_fds_index = 0; |
| 245 | 300 |
| 246 size_t total_fd_count = serialization->serialized_read_fds_length + | 301 size_t total_fd_count = serialization->serialized_read_fds_length + |
| 247 serialization->serialized_write_fds_length + | 302 serialization->serialized_write_fds_length + |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 326 &serialized_write_fds); | 381 &serialized_write_fds); |
| 327 | 382 |
| 328 if (message_queue_size) { // Should be empty by now. | 383 if (message_queue_size) { // Should be empty by now. |
| 329 LOG(ERROR) << "Invalid queued messages"; | 384 LOG(ERROR) << "Invalid queued messages"; |
| 330 return nullptr; | 385 return nullptr; |
| 331 } | 386 } |
| 332 | 387 |
| 333 return rv; | 388 return rv; |
| 334 } | 389 } |
| 335 | 390 |
| 336 MessagePipeDispatcher::MessagePipeDispatcher() | 391 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
| 337 : channel_(nullptr), | 392 : channel_(nullptr), |
| 338 serialized_(false), | |
| 339 serialized_read_fds_length_(0u), | 393 serialized_read_fds_length_(0u), |
| 340 serialized_write_fds_length_(0u), | 394 serialized_write_fds_length_(0u), |
| 341 serialized_message_fds_length_(0u), | 395 serialized_message_fds_length_(0u), |
| 396 pipe_id_(0), | |
| 397 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), | |
| 398 serialized_(false), | |
| 342 calling_init_(false), | 399 calling_init_(false), |
| 343 write_error_(false) { | 400 write_error_(false), |
| 401 transferable_(transferable) { | |
| 344 } | 402 } |
| 345 | 403 |
| 346 MessagePipeDispatcher::~MessagePipeDispatcher() { | 404 MessagePipeDispatcher::~MessagePipeDispatcher() { |
| 347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 405 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
| 348 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 406 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
| 349 // down and so when it was deleting pending tasks it caused the last reference | 407 // down and so when it was deleting pending tasks it caused the last reference |
| 350 // to destruct this object. In that case, safe to destroy the channel. | 408 // to destruct this object. In that case, safe to destroy the channel. |
| 351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | 409 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
| 352 channel_->Shutdown(); | 410 channel_->Shutdown(); |
| 353 else | 411 else |
| 354 DCHECK(!channel_); | 412 DCHECK(!channel_); |
| 355 #if defined(OS_POSIX) | 413 #if defined(OS_POSIX) |
| 356 ClosePlatformHandles(&serialized_fds_); | 414 ClosePlatformHandles(&serialized_fds_); |
| 357 #endif | 415 #endif |
| 358 } | 416 } |
| 359 | 417 |
| 360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { | 418 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
| 361 lock().AssertAcquired(); | 419 lock().AssertAcquired(); |
| 362 awakable_list_.CancelAll(); | 420 awakable_list_.CancelAll(); |
| 363 } | 421 } |
| 364 | 422 |
| 365 void MessagePipeDispatcher::CloseImplNoLock() { | 423 void MessagePipeDispatcher::CloseImplNoLock() { |
| 366 lock().AssertAcquired(); | 424 lock().AssertAcquired(); |
| 367 internal::g_io_thread_task_runner->PostTask( | 425 internal::g_io_thread_task_runner->PostTask( |
| 368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 426 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
| 369 } | 427 } |
| 370 | 428 |
| 371 void MessagePipeDispatcher::SerializeInternal() { | 429 void MessagePipeDispatcher::SerializeInternal() { |
| 430 serialized_ = true; | |
| 431 if (!transferable_) { | |
| 432 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
| 433 << "Non transferable message pipe being sent after read/write. " | |
| 434 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " | |
| 435 << "the pipe can be sent after it's read or written."; | |
| 436 return; | |
| 437 } | |
| 438 | |
| 372 // We need to stop watching handle immediately, even though not on IO thread, | 439 // We need to stop watching handle immediately, even though not on IO thread, |
| 373 // so that other messages aren't read after this. | 440 // so that other messages aren't read after this. |
| 374 std::vector<int> serialized_read_fds, serialized_write_fds; | 441 std::vector<int> serialized_read_fds, serialized_write_fds; |
| 375 if (channel_) { | 442 if (channel_) { |
| 376 bool write_error = false; | 443 bool write_error = false; |
| 377 | 444 |
| 378 serialized_platform_handle_ = channel_->ReleaseHandle( | 445 serialized_platform_handle_ = channel_->ReleaseHandle( |
| 379 &serialized_read_buffer_, &serialized_write_buffer_, | 446 &serialized_read_buffer_, &serialized_write_buffer_, |
| 380 &serialized_read_fds, &serialized_write_fds, &write_error); | 447 &serialized_read_fds, &serialized_write_fds, &write_error); |
| 381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), | 448 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), |
| 382 serialized_read_fds.end()); | 449 serialized_read_fds.end()); |
| 383 serialized_read_fds_length_ = serialized_read_fds.size(); | 450 serialized_read_fds_length_ = serialized_read_fds.size(); |
| 384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), | 451 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), |
| 385 serialized_write_fds.end()); | 452 serialized_write_fds.end()); |
| 386 serialized_write_fds_length_ = serialized_write_fds.size(); | 453 serialized_write_fds_length_ = serialized_write_fds.size(); |
| 387 channel_ = nullptr; | 454 channel_ = nullptr; |
| 388 if (write_error) | |
| 389 write_error = true; | |
| 390 } else { | 455 } else { |
| 391 // It's valid that the other side wrote some data and closed its end. | 456 // It's valid that the other side wrote some data and closed its end. |
| 392 } | 457 } |
| 393 | 458 |
| 394 DCHECK(serialized_message_queue_.empty()); | 459 DCHECK(serialized_message_queue_.empty()); |
| 395 while (!message_queue_.IsEmpty()) { | 460 while (!message_queue_.IsEmpty()) { |
| 396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); | 461 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
| 397 | 462 |
| 398 // When MojoWriteMessage is called, the MessageInTransit doesn't have | 463 // When MojoWriteMessage is called, the MessageInTransit doesn't have |
| 399 // dispatchers set and CreateEquivaent... is called since the dispatchers | 464 // dispatchers set and CreateEquivaent... is called since the dispatchers |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 446 serialized_message_queue_.end(), | 511 serialized_message_queue_.end(), |
| 447 static_cast<const char*>(message->transport_data()->buffer()), | 512 static_cast<const char*>(message->transport_data()->buffer()), |
| 448 static_cast<const char*>(message->transport_data()->buffer()) + | 513 static_cast<const char*>(message->transport_data()->buffer()) + |
| 449 transport_data_buffer_size); | 514 transport_data_buffer_size); |
| 450 } | 515 } |
| 451 } | 516 } |
| 452 | 517 |
| 453 for (size_t i = 0; i < dispatchers.size(); ++i) | 518 for (size_t i = 0; i < dispatchers.size(); ++i) |
| 454 dispatchers[i]->TransportEnded(); | 519 dispatchers[i]->TransportEnded(); |
| 455 } | 520 } |
| 456 | |
| 457 serialized_ = true; | |
| 458 } | 521 } |
| 459 | 522 |
| 460 scoped_refptr<Dispatcher> | 523 scoped_refptr<Dispatcher> |
| 461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 524 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| 462 lock().AssertAcquired(); | 525 lock().AssertAcquired(); |
| 463 | 526 |
| 464 SerializeInternal(); | 527 SerializeInternal(); |
| 465 | 528 |
| 466 // TODO(vtl): Currently, there are no options, so we just use | 529 // TODO(vtl): Currently, there are no options, so we just use |
|
yzshen1
2015/12/03 23:37:50
I think this comment is not useful anymore.
jam
2015/12/04 05:06:47
Done.
| |
| 467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options | 530 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options |
| 468 // too. | 531 // too. |
| 469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); | 532 scoped_refptr<MessagePipeDispatcher> rv( |
| 470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); | 533 new MessagePipeDispatcher(transferable_)); |
| 471 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
| 472 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
| 473 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
| 474 serialized_fds_.swap(rv->serialized_fds_); | |
| 475 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | |
| 476 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | |
| 477 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | |
| 478 rv->serialized_ = true; | 534 rv->serialized_ = true; |
| 479 rv->write_error_ = write_error_; | 535 if (transferable_) { |
| 480 return scoped_refptr<Dispatcher>(rv.get()); | 536 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
| 537 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
| 538 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
| 539 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
| 540 serialized_fds_.swap(rv->serialized_fds_); | |
| 541 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | |
| 542 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | |
| 543 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | |
| 544 rv->write_error_ = write_error_; | |
| 545 } else { | |
| 546 rv->pipe_id_ = pipe_id_; | |
| 547 rv->non_transferable_state_ = non_transferable_state_; | |
| 548 } | |
| 549 return rv; | |
| 481 } | 550 } |
| 482 | 551 |
| 483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | 552 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
| 484 const void* bytes, | 553 const void* bytes, |
| 485 uint32_t num_bytes, | 554 uint32_t num_bytes, |
| 486 std::vector<DispatcherTransport>* transports, | 555 std::vector<DispatcherTransport>* transports, |
| 487 MojoWriteMessageFlags flags) { | 556 MojoWriteMessageFlags flags) { |
| 557 lock().AssertAcquired(); | |
| 488 | 558 |
| 489 DCHECK(!transports || | 559 DCHECK(!transports || |
| 490 (transports->size() > 0 && | 560 (transports->size() > 0 && |
| 491 transports->size() <= GetConfiguration().max_message_num_handles)); | 561 transports->size() <= GetConfiguration().max_message_num_handles)); |
| 492 | 562 |
| 493 lock().AssertAcquired(); | 563 if (write_error_ || |
| 494 | 564 (transferable_ && !channel_) || |
| 495 if (!channel_ || write_error_) | 565 (!transferable_ && non_transferable_state_ == CLOSED)) { |
| 496 return MOJO_RESULT_FAILED_PRECONDITION; | 566 return MOJO_RESULT_FAILED_PRECONDITION; |
| 567 } | |
| 497 | 568 |
| 498 if (num_bytes > GetConfiguration().max_message_num_bytes) | 569 if (num_bytes > GetConfiguration().max_message_num_bytes) |
| 499 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 570 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 500 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 571 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); | 572 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
| 502 if (transports) { | 573 if (transports) { |
| 503 MojoResult result = AttachTransportsNoLock(message.get(), transports); | 574 MojoResult result = AttachTransportsNoLock(message.get(), transports); |
| 504 if (result != MOJO_RESULT_OK) | 575 if (result != MOJO_RESULT_OK) |
| 505 return result; | 576 return result; |
| 506 } | 577 } |
| 507 | 578 |
| 508 message->SerializeAndCloseDispatchers(); | 579 message->SerializeAndCloseDispatchers(); |
| 509 channel_->WriteMessage(message.Pass()); | 580 if (!transferable_) |
| 581 message->set_route_id(pipe_id_); | |
| 582 if (!transferable_ && | |
| 583 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || | |
| 584 non_transferable_state_ == CONNECT_CALLED)) { | |
| 585 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
| 586 RequestNontransferableChannel(); | |
| 587 non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); | |
| 588 } else { | |
| 589 channel_->WriteMessage(message.Pass()); | |
| 590 } | |
| 510 | 591 |
| 511 return MOJO_RESULT_OK; | 592 return MOJO_RESULT_OK; |
| 512 } | 593 } |
| 513 | 594 |
| 514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | 595 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
| 515 void* bytes, | 596 void* bytes, |
| 516 uint32_t* num_bytes, | 597 uint32_t* num_bytes, |
| 517 DispatcherVector* dispatchers, | 598 DispatcherVector* dispatchers, |
| 518 uint32_t* num_dispatchers, | 599 uint32_t* num_dispatchers, |
| 519 MojoReadMessageFlags flags) { | 600 MojoReadMessageFlags flags) { |
| 520 lock().AssertAcquired(); | 601 lock().AssertAcquired(); |
| 521 if (channel_) | 602 if (channel_) { |
| 522 channel_->EnsureLazyInitialized(); | 603 channel_->EnsureLazyInitialized(); |
| 604 } else if (!transferable_ && | |
| 605 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
| 606 RequestNontransferableChannel(); | |
| 607 return MOJO_RESULT_SHOULD_WAIT; | |
| 608 } | |
| 609 | |
| 523 DCHECK(!dispatchers || dispatchers->empty()); | 610 DCHECK(!dispatchers || dispatchers->empty()); |
| 524 | 611 |
| 525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; | 612 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
| 526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; | 613 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
| 527 | 614 |
| 528 if (message_queue_.IsEmpty()) | 615 if (message_queue_.IsEmpty()) |
| 529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; | 616 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; |
| 530 | 617 |
| 531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | 618 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
| 532 // and release the lock immediately. | 619 // and release the lock immediately. |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 576 return MOJO_RESULT_OK; | 663 return MOJO_RESULT_OK; |
| 577 } | 664 } |
| 578 | 665 |
| 579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | 666 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
| 580 const { | 667 const { |
| 581 lock().AssertAcquired(); | 668 lock().AssertAcquired(); |
| 582 | 669 |
| 583 HandleSignalsState rv; | 670 HandleSignalsState rv; |
| 584 if (!message_queue_.IsEmpty()) | 671 if (!message_queue_.IsEmpty()) |
| 585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 672 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 586 if (channel_ || !message_queue_.IsEmpty()) | 673 if (!message_queue_.IsEmpty() || |
| 674 (transferable_ && channel_) || | |
| 675 (!transferable_ && non_transferable_state_ != CLOSED)) | |
| 587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 676 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 588 if (channel_ && !write_error_) { | 677 if (!write_error_ && |
| 678 ((transferable_ && channel_) || | |
| 679 (!transferable_ && non_transferable_state_ != CLOSED))) { | |
| 589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 680 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 681 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 591 } | 682 } |
| 592 if (!channel_ || write_error_) | 683 if (write_error_ || |
| 684 (transferable_ && !channel_) || | |
| 685 (!transferable_ && non_transferable_state_ == CLOSED)) { | |
| 593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 686 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 687 } | |
| 594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 688 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 595 return rv; | 689 return rv; |
| 596 } | 690 } |
| 597 | 691 |
| 598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( | 692 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
| 599 Awakable* awakable, | 693 Awakable* awakable, |
| 600 MojoHandleSignals signals, | 694 MojoHandleSignals signals, |
| 601 uintptr_t context, | 695 uintptr_t context, |
| 602 HandleSignalsState* signals_state) { | 696 HandleSignalsState* signals_state) { |
| 603 lock().AssertAcquired(); | 697 lock().AssertAcquired(); |
| 604 if (channel_) | 698 if (channel_) { |
| 605 channel_->EnsureLazyInitialized(); | 699 channel_->EnsureLazyInitialized(); |
| 700 } else if (!transferable_ && | |
| 701 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
| 702 RequestNontransferableChannel(); | |
| 703 } | |
| 704 | |
| 606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 705 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| 607 if (state.satisfies(signals)) { | 706 if (state.satisfies(signals)) { |
| 608 if (signals_state) | 707 if (signals_state) |
| 609 *signals_state = state; | 708 *signals_state = state; |
| 610 return MOJO_RESULT_ALREADY_EXISTS; | 709 return MOJO_RESULT_ALREADY_EXISTS; |
| 611 } | 710 } |
| 612 if (!state.can_satisfy(signals)) { | 711 if (!state.can_satisfy(signals)) { |
| 613 if (signals_state) | 712 if (signals_state) |
| 614 *signals_state = state; | 713 *signals_state = state; |
| 615 return MOJO_RESULT_FAILED_PRECONDITION; | 714 return MOJO_RESULT_FAILED_PRECONDITION; |
| (...skipping 30 matching lines...) Expand all Loading... | |
| 646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); | 745 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); |
| 647 } | 746 } |
| 648 | 747 |
| 649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | 748 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
| 650 void* destination, | 749 void* destination, |
| 651 size_t* actual_size, | 750 size_t* actual_size, |
| 652 PlatformHandleVector* platform_handles) { | 751 PlatformHandleVector* platform_handles) { |
| 653 CloseImplNoLock(); | 752 CloseImplNoLock(); |
| 654 SerializedMessagePipeHandleDispatcher* serialization = | 753 SerializedMessagePipeHandleDispatcher* serialization = |
| 655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); | 754 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
| 755 serialization->transferable = transferable_; | |
| 756 serialization->pipe_id = pipe_id_; | |
| 656 if (serialized_platform_handle_.is_valid()) { | 757 if (serialized_platform_handle_.is_valid()) { |
| 657 serialization->platform_handle_index = platform_handles->size(); | 758 serialization->platform_handle_index = platform_handles->size(); |
| 658 platform_handles->push_back(serialized_platform_handle_.release()); | 759 platform_handles->push_back(serialized_platform_handle_.release()); |
| 659 } else { | 760 } else { |
| 660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; | 761 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
| 661 } | 762 } |
| 662 | 763 |
| 663 serialization->write_error = write_error_; | 764 serialization->write_error = write_error_; |
| 664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); | 765 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); |
| 665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); | 766 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 789 break; | 890 break; |
| 790 } | 891 } |
| 791 | 892 |
| 792 if (started_transport_.Try()) { | 893 if (started_transport_.Try()) { |
| 793 base::AutoLock locker(lock()); | 894 base::AutoLock locker(lock()); |
| 794 // We can get two OnError callbacks before the post task below completes. | 895 // We can get two OnError callbacks before the post task below completes. |
| 795 // Although RawChannel still has a pointer to this object until Shutdown is | 896 // Although RawChannel still has a pointer to this object until Shutdown is |
| 796 // called, that is safe since this class always does a PostTask to the IO | 897 // called, that is safe since this class always does a PostTask to the IO |
| 797 // thread to self destruct. | 898 // thread to self destruct. |
| 798 if (channel_ && error != ERROR_WRITE) { | 899 if (channel_ && error != ERROR_WRITE) { |
| 799 channel_->Shutdown(); | 900 if (transferable_) { |
| 901 channel_->Shutdown(); | |
| 902 } else { | |
| 903 CHECK_NE(non_transferable_state_, CLOSED); | |
| 904 // Since we're in a callback from the Broker, call it asynchronously. | |
| 905 internal::g_io_thread_task_runner->PostTask( | |
| 906 FROM_HERE, | |
| 907 base::Bind(&Broker::CloseMessagePipe, | |
| 908 base::Unretained(internal::g_broker), pipe_id_, | |
| 909 base::Unretained(this))); | |
| 910 non_transferable_state_ = CLOSED; | |
| 911 } | |
| 800 channel_ = nullptr; | 912 channel_ = nullptr; |
| 801 } | 913 } |
| 802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 914 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 803 started_transport_.Release(); | 915 started_transport_.Release(); |
| 804 } else { | 916 } else { |
| 805 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 917 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| 806 } | 918 } |
| 807 } | 919 } |
| 808 | 920 |
| 809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 921 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 842 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 954 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
| 843 } else { | 955 } else { |
| 844 LOG(WARNING) << "Enqueueing null dispatcher"; | 956 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 845 dispatchers->push_back(nullptr); | 957 dispatchers->push_back(nullptr); |
| 846 } | 958 } |
| 847 } | 959 } |
| 848 message->SetDispatchers(dispatchers.Pass()); | 960 message->SetDispatchers(dispatchers.Pass()); |
| 849 return MOJO_RESULT_OK; | 961 return MOJO_RESULT_OK; |
| 850 } | 962 } |
| 851 | 963 |
| 964 void MessagePipeDispatcher::RequestNontransferableChannel() { | |
| 965 lock().AssertAcquired(); | |
| 966 CHECK(!transferable_); | |
| 967 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); | |
| 968 non_transferable_state_ = CONNECT_CALLED; | |
| 969 | |
| 970 // PostTask since the broker can call us back synchronously. | |
| 971 internal::g_io_thread_task_runner->PostTask( | |
| 972 FROM_HERE, | |
| 973 base::Bind(&Broker::ConnectMessagePipe, | |
| 974 base::Unretained(internal::g_broker), pipe_id_, | |
| 975 base::Unretained(this))); | |
| 976 } | |
| 977 | |
| 852 } // namespace edk | 978 } // namespace edk |
| 853 } // namespace mojo | 979 } // namespace mojo |
| OLD | NEW |