| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/logging.h" | 8 #include "base/logging.h" |
| 9 #include "base/message_loop/message_loop.h" | 9 #include "base/message_loop/message_loop.h" |
| 10 #include "mojo/edk/embedder/embedder_internal.h" | 10 #include "mojo/edk/embedder/embedder_internal.h" |
| 11 #include "mojo/edk/embedder/platform_handle_utils.h" | 11 #include "mojo/edk/embedder/platform_handle_utils.h" |
| 12 #include "mojo/edk/embedder/platform_shared_buffer.h" | 12 #include "mojo/edk/embedder/platform_shared_buffer.h" |
| 13 #include "mojo/edk/embedder/platform_support.h" | 13 #include "mojo/edk/embedder/platform_support.h" |
| 14 #include "mojo/edk/system/broker.h" | 14 #include "mojo/edk/system/broker.h" |
| 15 #include "mojo/edk/system/configuration.h" | 15 #include "mojo/edk/system/configuration.h" |
| 16 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
| 17 #include "mojo/edk/system/options_validation.h" | 17 #include "mojo/edk/system/options_validation.h" |
| 18 #include "mojo/edk/system/transport_data.h" | 18 #include "mojo/edk/system/transport_data.h" |
| 19 | 19 |
| 20 namespace mojo { | 20 namespace mojo { |
| 21 namespace edk { | 21 namespace edk { |
| 22 | 22 |
| 23 namespace { | 23 namespace { |
| 24 | 24 |
| 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); | 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
| 26 | 26 |
| 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { | 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
| 28 bool transferable; |
| 29 bool write_error; |
| 30 uint64_t pipe_id; // If transferable is false. |
| 31 // The following members are only set if transferable is true. |
| 28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP | 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
| 29 // was closed. | 33 // was closed. |
| 30 size_t platform_handle_index; | 34 size_t platform_handle_index; |
| 31 bool write_error; | |
| 32 | 35 |
| 33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) | 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
| 34 uint32_t shared_memory_size; | 37 uint32_t shared_memory_size; |
| 35 | 38 |
| 36 size_t serialized_read_buffer_size; | 39 size_t serialized_read_buffer_size; |
| 37 size_t serialized_write_buffer_size; | 40 size_t serialized_write_buffer_size; |
| 38 size_t serialized_message_queue_size; | 41 size_t serialized_message_queue_size; |
| 39 | 42 |
| 40 // These are the FDs required as part of serializing channel_ and | 43 // These are the FDs required as part of serializing channel_ and |
| 41 // message_queue_. This is only used on POSIX. | 44 // message_queue_. This is only used on POSIX. |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 83 | 86 |
| 84 const MojoCreateMessagePipeOptions | 87 const MojoCreateMessagePipeOptions |
| 85 MessagePipeDispatcher::kDefaultCreateOptions = { | 88 MessagePipeDispatcher::kDefaultCreateOptions = { |
| 86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
| 87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
| 88 | 91 |
| 89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
| 90 const MojoCreateMessagePipeOptions* in_options, | 93 const MojoCreateMessagePipeOptions* in_options, |
| 91 MojoCreateMessagePipeOptions* out_options) { | 94 MojoCreateMessagePipeOptions* out_options) { |
| 92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
| 93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; | 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
| 97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; |
| 94 | 98 |
| 95 *out_options = kDefaultCreateOptions; | 99 *out_options = kDefaultCreateOptions; |
| 96 if (!in_options) | 100 if (!in_options) |
| 97 return MOJO_RESULT_OK; | 101 return MOJO_RESULT_OK; |
| 98 | 102 |
| 99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
| 100 if (!reader.is_valid()) | 104 if (!reader.is_valid()) |
| 101 return MOJO_RESULT_INVALID_ARGUMENT; | 105 return MOJO_RESULT_INVALID_ARGUMENT; |
| 102 | 106 |
| 103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
| 104 return MOJO_RESULT_OK; | 108 return MOJO_RESULT_OK; |
| 105 if ((reader.options().flags & ~kKnownFlags)) | 109 if ((reader.options().flags & ~kKnownFlags)) |
| 106 return MOJO_RESULT_UNIMPLEMENTED; | 110 return MOJO_RESULT_UNIMPLEMENTED; |
| 107 out_options->flags = reader.options().flags; | 111 out_options->flags = reader.options().flags; |
| 108 | 112 |
| 109 // Checks for fields beyond |flags|: | 113 // Checks for fields beyond |flags|: |
| 110 | 114 |
| 111 // (Nothing here yet.) | 115 // (Nothing here yet.) |
| 112 | 116 |
| 113 return MOJO_RESULT_OK; | 117 return MOJO_RESULT_OK; |
| 114 } | 118 } |
| 115 | 119 |
| 116 void MessagePipeDispatcher::Init( | 120 void MessagePipeDispatcher::Init( |
| 117 ScopedPlatformHandle message_pipe, | 121 ScopedPlatformHandle message_pipe, |
| 118 char* serialized_read_buffer, size_t serialized_read_buffer_size, | 122 char* serialized_read_buffer, size_t serialized_read_buffer_size, |
| 119 char* serialized_write_buffer, size_t serialized_write_buffer_size, | 123 char* serialized_write_buffer, size_t serialized_write_buffer_size, |
| 120 std::vector<int>* serialized_read_fds, | 124 std::vector<int>* serialized_read_fds, |
| 121 std::vector<int>* serialized_write_fds) { | 125 std::vector<int>* serialized_write_fds) { |
| 126 CHECK(transferable_); |
| 122 if (message_pipe.get().is_valid()) { | 127 if (message_pipe.get().is_valid()) { |
| 123 channel_ = RawChannel::Create(message_pipe.Pass()); | 128 channel_ = RawChannel::Create(message_pipe.Pass()); |
| 124 | 129 |
| 125 // TODO(jam): It's probably cleaner to pass this in Init call. | 130 // TODO(jam): It's probably cleaner to pass this in Init call. |
| 126 channel_->SetSerializedData( | 131 channel_->SetSerializedData( |
| 127 serialized_read_buffer, serialized_read_buffer_size, | 132 serialized_read_buffer, serialized_read_buffer_size, |
| 128 serialized_write_buffer, serialized_write_buffer_size, | 133 serialized_write_buffer, serialized_write_buffer_size, |
| 129 serialized_read_fds, serialized_write_fds); | 134 serialized_read_fds, serialized_write_fds); |
| 130 internal::g_io_thread_task_runner->PostTask( | 135 internal::g_io_thread_task_runner->PostTask( |
| 131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
| 132 } | 137 } |
| 133 } | 138 } |
| 134 | 139 |
| 140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { |
| 141 CHECK(!transferable_); |
| 142 pipe_id_ = pipe_id; |
| 143 } |
| 144 |
| 135 void MessagePipeDispatcher::InitOnIO() { | 145 void MessagePipeDispatcher::InitOnIO() { |
| 136 base::AutoLock locker(lock()); | 146 base::AutoLock locker(lock()); |
| 147 CHECK(transferable_); |
| 137 calling_init_ = true; | 148 calling_init_ = true; |
| 138 if (channel_) | 149 if (channel_) |
| 139 channel_->Init(this); | 150 channel_->Init(this); |
| 140 calling_init_ = false; | 151 calling_init_ = false; |
| 141 } | 152 } |
| 142 | 153 |
| 143 void MessagePipeDispatcher::CloseOnIO() { | 154 void MessagePipeDispatcher::CloseOnIO() { |
| 144 base::AutoLock locker(lock()); | 155 base::AutoLock locker(lock()); |
| 156 if (transferable_) { |
| 157 if (channel_) { |
| 158 channel_->Shutdown(); |
| 159 channel_ = nullptr; |
| 160 } |
| 161 } else { |
| 162 if (non_transferable_state_ == CONNECT_CALLED || |
| 163 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| 164 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 165 RequestNontransferableChannel(); |
| 145 | 166 |
| 146 if (channel_) { | 167 // We can't cancel the pending request yet, since the other side of the |
| 147 channel_->Shutdown(); | 168 // message pipe would want to get pending outgoing messages (if any) or |
| 148 channel_ = nullptr; | 169 // at least know that this end was closed. So keep this object alive until |
| 170 // then. |
| 171 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
| 172 AddRef(); |
| 173 } else if (non_transferable_state_ == CONNECTED) { |
| 174 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| 175 non_transferable_state_ = CLOSED; |
| 176 channel_ = nullptr; |
| 177 } |
| 149 } | 178 } |
| 150 } | 179 } |
| 151 | 180 |
| 152 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 181 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| 153 return Type::MESSAGE_PIPE; | 182 return Type::MESSAGE_PIPE; |
| 154 } | 183 } |
| 155 | 184 |
| 185 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| 186 base::AutoLock locker(lock()); |
| 187 channel_ = channel; |
| 188 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
| 189 channel_->WriteMessage( |
| 190 non_transferable_outgoing_message_queue_.GetMessage()); |
| 191 } |
| 192 |
| 193 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
| 194 // We kept this object alive until it's connected, we can release it now. |
| 195 // Since we're in a callback from the Broker, call it asynchronously. |
| 196 internal::g_io_thread_task_runner->PostTask( |
| 197 FROM_HERE, |
| 198 base::Bind(&Broker::CloseMessagePipe, |
| 199 base::Unretained(internal::g_broker), pipe_id_, |
| 200 base::Unretained(this))); |
| 201 non_transferable_state_ = CLOSED; |
| 202 channel_ = nullptr; |
| 203 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
| 204 } else { |
| 205 non_transferable_state_ = CONNECTED; |
| 206 } |
| 207 } |
| 208 |
| 156 #if defined(OS_WIN) | 209 #if defined(OS_WIN) |
| 157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 210 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
| 158 // best way we want to share this. | 211 // best way we want to share this. |
| 159 // Since this is used for serialization of messages read/written to a MP that | 212 // Since this is used for serialization of messages read/written to a MP that |
| 160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 213 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
| 161 // them when a MP is being sent. As a result, even for POSIX we will probably | 214 // them when a MP is being sent. As a result, even for POSIX we will probably |
| 162 // want to send the handles to the shell process and exchange them for tokens | 215 // want to send the handles to the shell process and exchange them for tokens |
| 163 // (since we can be sure that the shell will respond to our IPCs, compared to | 216 // (since we can be sure that the shell will respond to our IPCs, compared to |
| 164 // the other end where we're sending the MP to, which may not be reading...). | 217 // the other end where we're sending the MP to, which may not be reading...). |
| 165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | 218 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
| (...skipping 13 matching lines...) Expand all Loading... |
| 179 const void* source, | 232 const void* source, |
| 180 size_t size, | 233 size_t size, |
| 181 PlatformHandleVector* platform_handles) { | 234 PlatformHandleVector* platform_handles) { |
| 182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { | 235 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { |
| 183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; | 236 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; |
| 184 return nullptr; | 237 return nullptr; |
| 185 } | 238 } |
| 186 | 239 |
| 187 const SerializedMessagePipeHandleDispatcher* serialization = | 240 const SerializedMessagePipeHandleDispatcher* serialization = |
| 188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); | 241 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
| 242 |
| 243 scoped_refptr<MessagePipeDispatcher> rv( |
| 244 new MessagePipeDispatcher(serialization->transferable)); |
| 245 if (!rv->transferable_) { |
| 246 rv->InitNonTransferable(serialization->pipe_id); |
| 247 return rv; |
| 248 } |
| 249 |
| 189 if (serialization->shared_memory_size != | 250 if (serialization->shared_memory_size != |
| 190 (serialization->serialized_read_buffer_size + | 251 (serialization->serialized_read_buffer_size + |
| 191 serialization->serialized_write_buffer_size + | 252 serialization->serialized_write_buffer_size + |
| 192 serialization->serialized_message_queue_size)) { | 253 serialization->serialized_message_queue_size)) { |
| 193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; | 254 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; |
| 194 return nullptr; | 255 return nullptr; |
| 195 } | 256 } |
| 196 | 257 |
| 197 ScopedPlatformHandle platform_handle, shared_memory_handle; | 258 ScopedPlatformHandle platform_handle, shared_memory_handle; |
| 198 if (!GetHandle(serialization->platform_handle_index, | 259 if (!GetHandle(serialization->platform_handle_index, |
| (...skipping 27 matching lines...) Expand all Loading... |
| 226 serialization->serialized_write_buffer_size; | 287 serialization->serialized_write_buffer_size; |
| 227 buffer += serialized_write_buffer_size; | 288 buffer += serialized_write_buffer_size; |
| 228 } | 289 } |
| 229 if (serialization->serialized_message_queue_size) { | 290 if (serialization->serialized_message_queue_size) { |
| 230 message_queue_data = buffer; | 291 message_queue_data = buffer; |
| 231 message_queue_size = serialization->serialized_message_queue_size; | 292 message_queue_size = serialization->serialized_message_queue_size; |
| 232 buffer += message_queue_size; | 293 buffer += message_queue_size; |
| 233 } | 294 } |
| 234 } | 295 } |
| 235 | 296 |
| 236 scoped_refptr<MessagePipeDispatcher> rv( | |
| 237 Create(MessagePipeDispatcher::kDefaultCreateOptions)); | |
| 238 rv->write_error_ = serialization->write_error; | 297 rv->write_error_ = serialization->write_error; |
| 239 | 298 |
| 240 std::vector<int> serialized_read_fds; | 299 std::vector<int> serialized_read_fds; |
| 241 std::vector<int> serialized_write_fds; | 300 std::vector<int> serialized_write_fds; |
| 242 #if defined(OS_POSIX) | 301 #if defined(OS_POSIX) |
| 243 std::vector<int> serialized_fds; | 302 std::vector<int> serialized_fds; |
| 244 size_t serialized_fds_index = 0; | 303 size_t serialized_fds_index = 0; |
| 245 | 304 |
| 246 size_t total_fd_count = serialization->serialized_read_fds_length + | 305 size_t total_fd_count = serialization->serialized_read_fds_length + |
| 247 serialization->serialized_write_fds_length + | 306 serialization->serialized_write_fds_length + |
| (...skipping 14 matching lines...) Expand all Loading... |
| 262 serialized_fds_index += serialization->serialized_read_fds_length; | 321 serialized_fds_index += serialization->serialized_read_fds_length; |
| 263 serialized_write_fds.assign( | 322 serialized_write_fds.assign( |
| 264 serialized_fds.begin() + serialized_fds_index, | 323 serialized_fds.begin() + serialized_fds_index, |
| 265 serialized_fds.begin() + serialized_fds_index + | 324 serialized_fds.begin() + serialized_fds_index + |
| 266 serialization->serialized_write_fds_length); | 325 serialization->serialized_write_fds_length); |
| 267 serialized_fds_index += serialization->serialized_write_fds_length; | 326 serialized_fds_index += serialization->serialized_write_fds_length; |
| 268 #endif | 327 #endif |
| 269 | 328 |
| 270 while (message_queue_size) { | 329 while (message_queue_size) { |
| 271 size_t message_size; | 330 size_t message_size; |
| 272 CHECK(MessageInTransit::GetNextMessageSize( | 331 if (!MessageInTransit::GetNextMessageSize( |
| 273 message_queue_data, message_queue_size, &message_size)); | 332 message_queue_data, message_queue_size, &message_size)) { |
| 333 NOTREACHED() << "Couldn't read message size from serialized data."; |
| 334 return nullptr; |
| 335 } |
| 336 if (message_size > message_queue_size) { |
| 337 NOTREACHED() << "Invalid serialized message size."; |
| 338 return nullptr; |
| 339 } |
| 274 MessageInTransit::View message_view(message_size, message_queue_data); | 340 MessageInTransit::View message_view(message_size, message_queue_data); |
| 275 message_queue_size -= message_size; | 341 message_queue_size -= message_size; |
| 276 message_queue_data += message_size; | 342 message_queue_data += message_size; |
| 277 | 343 |
| 278 // TODO(jam): Copied below from RawChannelWin. See commment above | 344 // TODO(jam): Copied below from RawChannelWin. See commment above |
| 279 // GetReadPlatformHandles. | 345 // GetReadPlatformHandles. |
| 280 ScopedPlatformHandleVectorPtr temp_platform_handles; | 346 ScopedPlatformHandleVectorPtr temp_platform_handles; |
| 281 if (message_view.transport_data_buffer()) { | 347 if (message_view.transport_data_buffer()) { |
| 282 size_t num_platform_handles; | 348 size_t num_platform_handles; |
| 283 const void* platform_handle_table; | 349 const void* platform_handle_table; |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 326 &serialized_write_fds); | 392 &serialized_write_fds); |
| 327 | 393 |
| 328 if (message_queue_size) { // Should be empty by now. | 394 if (message_queue_size) { // Should be empty by now. |
| 329 LOG(ERROR) << "Invalid queued messages"; | 395 LOG(ERROR) << "Invalid queued messages"; |
| 330 return nullptr; | 396 return nullptr; |
| 331 } | 397 } |
| 332 | 398 |
| 333 return rv; | 399 return rv; |
| 334 } | 400 } |
| 335 | 401 |
| 336 MessagePipeDispatcher::MessagePipeDispatcher() | 402 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
| 337 : channel_(nullptr), | 403 : channel_(nullptr), |
| 338 serialized_(false), | |
| 339 serialized_read_fds_length_(0u), | 404 serialized_read_fds_length_(0u), |
| 340 serialized_write_fds_length_(0u), | 405 serialized_write_fds_length_(0u), |
| 341 serialized_message_fds_length_(0u), | 406 serialized_message_fds_length_(0u), |
| 407 pipe_id_(0), |
| 408 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
| 409 serialized_(false), |
| 342 calling_init_(false), | 410 calling_init_(false), |
| 343 write_error_(false) { | 411 write_error_(false), |
| 412 transferable_(transferable) { |
| 344 } | 413 } |
| 345 | 414 |
| 346 MessagePipeDispatcher::~MessagePipeDispatcher() { | 415 MessagePipeDispatcher::~MessagePipeDispatcher() { |
| 347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 416 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
| 348 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 417 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
| 349 // down and so when it was deleting pending tasks it caused the last reference | 418 // down and so when it was deleting pending tasks it caused the last reference |
| 350 // to destruct this object. In that case, safe to destroy the channel. | 419 // to destruct this object. In that case, safe to destroy the channel. |
| 351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | 420 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
| 352 channel_->Shutdown(); | 421 channel_->Shutdown(); |
| 353 else | 422 else |
| 354 DCHECK(!channel_); | 423 DCHECK(!channel_); |
| 355 #if defined(OS_POSIX) | 424 #if defined(OS_POSIX) |
| 356 ClosePlatformHandles(&serialized_fds_); | 425 ClosePlatformHandles(&serialized_fds_); |
| 357 #endif | 426 #endif |
| 358 } | 427 } |
| 359 | 428 |
| 360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { | 429 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
| 361 lock().AssertAcquired(); | 430 lock().AssertAcquired(); |
| 362 awakable_list_.CancelAll(); | 431 awakable_list_.CancelAll(); |
| 363 } | 432 } |
| 364 | 433 |
| 365 void MessagePipeDispatcher::CloseImplNoLock() { | 434 void MessagePipeDispatcher::CloseImplNoLock() { |
| 366 lock().AssertAcquired(); | 435 lock().AssertAcquired(); |
| 367 internal::g_io_thread_task_runner->PostTask( | 436 internal::g_io_thread_task_runner->PostTask( |
| 368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 437 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
| 369 } | 438 } |
| 370 | 439 |
| 371 void MessagePipeDispatcher::SerializeInternal() { | 440 void MessagePipeDispatcher::SerializeInternal() { |
| 441 serialized_ = true; |
| 442 if (!transferable_) { |
| 443 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 444 << "Non transferable message pipe being sent after read/write. " |
| 445 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
| 446 << "the pipe can be sent after it's read or written."; |
| 447 non_transferable_state_ = SERIALISED; |
| 448 return; |
| 449 } |
| 450 |
| 372 // We need to stop watching handle immediately, even though not on IO thread, | 451 // We need to stop watching handle immediately, even though not on IO thread, |
| 373 // so that other messages aren't read after this. | 452 // so that other messages aren't read after this. |
| 374 std::vector<int> serialized_read_fds, serialized_write_fds; | 453 std::vector<int> serialized_read_fds, serialized_write_fds; |
| 375 if (channel_) { | 454 if (channel_) { |
| 376 bool write_error = false; | 455 bool write_error = false; |
| 377 | 456 |
| 378 serialized_platform_handle_ = channel_->ReleaseHandle( | 457 serialized_platform_handle_ = channel_->ReleaseHandle( |
| 379 &serialized_read_buffer_, &serialized_write_buffer_, | 458 &serialized_read_buffer_, &serialized_write_buffer_, |
| 380 &serialized_read_fds, &serialized_write_fds, &write_error); | 459 &serialized_read_fds, &serialized_write_fds, &write_error); |
| 381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), | 460 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), |
| 382 serialized_read_fds.end()); | 461 serialized_read_fds.end()); |
| 383 serialized_read_fds_length_ = serialized_read_fds.size(); | 462 serialized_read_fds_length_ = serialized_read_fds.size(); |
| 384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), | 463 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), |
| 385 serialized_write_fds.end()); | 464 serialized_write_fds.end()); |
| 386 serialized_write_fds_length_ = serialized_write_fds.size(); | 465 serialized_write_fds_length_ = serialized_write_fds.size(); |
| 387 channel_ = nullptr; | 466 channel_ = nullptr; |
| 388 if (write_error) | |
| 389 write_error = true; | |
| 390 } else { | 467 } else { |
| 391 // It's valid that the other side wrote some data and closed its end. | 468 // It's valid that the other side wrote some data and closed its end. |
| 392 } | 469 } |
| 393 | 470 |
| 394 DCHECK(serialized_message_queue_.empty()); | 471 DCHECK(serialized_message_queue_.empty()); |
| 395 while (!message_queue_.IsEmpty()) { | 472 while (!message_queue_.IsEmpty()) { |
| 396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); | 473 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
| 397 | 474 |
| 398 // When MojoWriteMessage is called, the MessageInTransit doesn't have | 475 // When MojoWriteMessage is called, the MessageInTransit doesn't have |
| 399 // dispatchers set and CreateEquivaent... is called since the dispatchers | 476 // dispatchers set and CreateEquivaent... is called since the dispatchers |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 434 &all_platform_handles->at(0), all_platform_handles->size(), tokens); | 511 &all_platform_handles->at(0), all_platform_handles->size(), tokens); |
| 435 for (size_t i = 0; i < all_platform_handles->size(); i++) | 512 for (size_t i = 0; i < all_platform_handles->size(); i++) |
| 436 all_platform_handles->at(i) = PlatformHandle(); | 513 all_platform_handles->at(i) = PlatformHandle(); |
| 437 #else | 514 #else |
| 438 for (size_t i = 0; i < all_platform_handles->size(); i++) { | 515 for (size_t i = 0; i < all_platform_handles->size(); i++) { |
| 439 serialized_fds_.push_back(all_platform_handles->at(i).fd); | 516 serialized_fds_.push_back(all_platform_handles->at(i).fd); |
| 440 serialized_message_fds_length_++; | 517 serialized_message_fds_length_++; |
| 441 all_platform_handles->at(i) = PlatformHandle(); | 518 all_platform_handles->at(i) = PlatformHandle(); |
| 442 } | 519 } |
| 443 #endif | 520 #endif |
| 521 } |
| 444 | 522 |
| 445 serialized_message_queue_.insert( | 523 serialized_message_queue_.insert( |
| 446 serialized_message_queue_.end(), | 524 serialized_message_queue_.end(), |
| 447 static_cast<const char*>(message->transport_data()->buffer()), | 525 static_cast<const char*>(message->transport_data()->buffer()), |
| 448 static_cast<const char*>(message->transport_data()->buffer()) + | 526 static_cast<const char*>(message->transport_data()->buffer()) + |
| 449 transport_data_buffer_size); | 527 transport_data_buffer_size); |
| 450 } | |
| 451 } | 528 } |
| 452 | 529 |
| 453 for (size_t i = 0; i < dispatchers.size(); ++i) | 530 for (size_t i = 0; i < dispatchers.size(); ++i) |
| 454 dispatchers[i]->TransportEnded(); | 531 dispatchers[i]->TransportEnded(); |
| 455 } | 532 } |
| 456 | |
| 457 serialized_ = true; | |
| 458 } | 533 } |
| 459 | 534 |
| 460 scoped_refptr<Dispatcher> | 535 scoped_refptr<Dispatcher> |
| 461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 536 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| 462 lock().AssertAcquired(); | 537 lock().AssertAcquired(); |
| 463 | 538 |
| 464 SerializeInternal(); | 539 SerializeInternal(); |
| 465 | 540 |
| 466 // TODO(vtl): Currently, there are no options, so we just use | 541 scoped_refptr<MessagePipeDispatcher> rv( |
| 467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options | 542 new MessagePipeDispatcher(transferable_)); |
| 468 // too. | |
| 469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); | |
| 470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); | |
| 471 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
| 472 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
| 473 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
| 474 serialized_fds_.swap(rv->serialized_fds_); | |
| 475 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | |
| 476 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | |
| 477 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | |
| 478 rv->serialized_ = true; | 543 rv->serialized_ = true; |
| 479 rv->write_error_ = write_error_; | 544 if (transferable_) { |
| 480 return scoped_refptr<Dispatcher>(rv.get()); | 545 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
| 546 serialized_message_queue_.swap(rv->serialized_message_queue_); |
| 547 serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| 548 serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
| 549 serialized_fds_.swap(rv->serialized_fds_); |
| 550 rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
| 551 rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
| 552 rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
| 553 rv->write_error_ = write_error_; |
| 554 } else { |
| 555 rv->pipe_id_ = pipe_id_; |
| 556 rv->non_transferable_state_ = non_transferable_state_; |
| 557 } |
| 558 return rv; |
| 481 } | 559 } |
| 482 | 560 |
| 483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | 561 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
| 484 const void* bytes, | 562 const void* bytes, |
| 485 uint32_t num_bytes, | 563 uint32_t num_bytes, |
| 486 std::vector<DispatcherTransport>* transports, | 564 std::vector<DispatcherTransport>* transports, |
| 487 MojoWriteMessageFlags flags) { | 565 MojoWriteMessageFlags flags) { |
| 566 lock().AssertAcquired(); |
| 488 | 567 |
| 489 DCHECK(!transports || | 568 DCHECK(!transports || |
| 490 (transports->size() > 0 && | 569 (transports->size() > 0 && |
| 491 transports->size() <= GetConfiguration().max_message_num_handles)); | 570 transports->size() <= GetConfiguration().max_message_num_handles)); |
| 492 | 571 |
| 493 lock().AssertAcquired(); | 572 if (write_error_ || |
| 494 | 573 (transferable_ && !channel_) || |
| 495 if (!channel_ || write_error_) | 574 (!transferable_ && non_transferable_state_ == CLOSED)) { |
| 496 return MOJO_RESULT_FAILED_PRECONDITION; | 575 return MOJO_RESULT_FAILED_PRECONDITION; |
| 576 } |
| 497 | 577 |
| 498 if (num_bytes > GetConfiguration().max_message_num_bytes) | 578 if (num_bytes > GetConfiguration().max_message_num_bytes) |
| 499 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 579 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 500 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 580 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); | 581 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
| 502 if (transports) { | 582 if (transports) { |
| 503 MojoResult result = AttachTransportsNoLock(message.get(), transports); | 583 MojoResult result = AttachTransportsNoLock(message.get(), transports); |
| 504 if (result != MOJO_RESULT_OK) | 584 if (result != MOJO_RESULT_OK) |
| 505 return result; | 585 return result; |
| 506 } | 586 } |
| 507 | 587 |
| 508 message->SerializeAndCloseDispatchers(); | 588 message->SerializeAndCloseDispatchers(); |
| 509 channel_->WriteMessage(message.Pass()); | 589 if (!transferable_) |
| 590 message->set_route_id(pipe_id_); |
| 591 if (!transferable_ && |
| 592 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
| 593 non_transferable_state_ == CONNECT_CALLED)) { |
| 594 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 595 RequestNontransferableChannel(); |
| 596 non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); |
| 597 } else { |
| 598 channel_->WriteMessage(message.Pass()); |
| 599 } |
| 510 | 600 |
| 511 return MOJO_RESULT_OK; | 601 return MOJO_RESULT_OK; |
| 512 } | 602 } |
| 513 | 603 |
| 514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | 604 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
| 515 void* bytes, | 605 void* bytes, |
| 516 uint32_t* num_bytes, | 606 uint32_t* num_bytes, |
| 517 DispatcherVector* dispatchers, | 607 DispatcherVector* dispatchers, |
| 518 uint32_t* num_dispatchers, | 608 uint32_t* num_dispatchers, |
| 519 MojoReadMessageFlags flags) { | 609 MojoReadMessageFlags flags) { |
| 520 lock().AssertAcquired(); | 610 lock().AssertAcquired(); |
| 521 if (channel_) | 611 if (channel_) { |
| 522 channel_->EnsureLazyInitialized(); | 612 channel_->EnsureLazyInitialized(); |
| 613 } else if (!transferable_) { |
| 614 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| 615 RequestNontransferableChannel(); |
| 616 return MOJO_RESULT_SHOULD_WAIT; |
| 617 } else if (non_transferable_state_ == CONNECT_CALLED) { |
| 618 return MOJO_RESULT_SHOULD_WAIT; |
| 619 } |
| 620 } |
| 621 |
| 523 DCHECK(!dispatchers || dispatchers->empty()); | 622 DCHECK(!dispatchers || dispatchers->empty()); |
| 524 | 623 |
| 525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; | 624 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
| 526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; | 625 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
| 527 | 626 |
| 528 if (message_queue_.IsEmpty()) | 627 if (message_queue_.IsEmpty()) |
| 529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; | 628 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; |
| 530 | 629 |
| 531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | 630 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
| 532 // and release the lock immediately. | 631 // and release the lock immediately. |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 576 return MOJO_RESULT_OK; | 675 return MOJO_RESULT_OK; |
| 577 } | 676 } |
| 578 | 677 |
| 579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | 678 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
| 580 const { | 679 const { |
| 581 lock().AssertAcquired(); | 680 lock().AssertAcquired(); |
| 582 | 681 |
| 583 HandleSignalsState rv; | 682 HandleSignalsState rv; |
| 584 if (!message_queue_.IsEmpty()) | 683 if (!message_queue_.IsEmpty()) |
| 585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 684 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 586 if (channel_ || !message_queue_.IsEmpty()) | 685 if (!message_queue_.IsEmpty() || |
| 686 (transferable_ && channel_) || |
| 687 (!transferable_ && non_transferable_state_ != CLOSED)) |
| 587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 688 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 588 if (channel_ && !write_error_) { | 689 if (!write_error_ && |
| 690 ((transferable_ && channel_) || |
| 691 (!transferable_ && non_transferable_state_ != CLOSED))) { |
| 589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 692 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 693 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 591 } | 694 } |
| 592 if (!channel_ || write_error_) | 695 if (write_error_ || |
| 696 (transferable_ && !channel_) || |
| 697 (!transferable_ && |
| 698 ((non_transferable_state_ == CLOSED) || is_closed()))) { |
| 593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 699 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 700 } |
| 594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 701 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 595 return rv; | 702 return rv; |
| 596 } | 703 } |
| 597 | 704 |
| 598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( | 705 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
| 599 Awakable* awakable, | 706 Awakable* awakable, |
| 600 MojoHandleSignals signals, | 707 MojoHandleSignals signals, |
| 601 uintptr_t context, | 708 uintptr_t context, |
| 602 HandleSignalsState* signals_state) { | 709 HandleSignalsState* signals_state) { |
| 603 lock().AssertAcquired(); | 710 lock().AssertAcquired(); |
| 604 if (channel_) | 711 if (channel_) { |
| 605 channel_->EnsureLazyInitialized(); | 712 channel_->EnsureLazyInitialized(); |
| 713 } else if (!transferable_ && |
| 714 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| 715 RequestNontransferableChannel(); |
| 716 } |
| 717 |
| 606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 718 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| 607 if (state.satisfies(signals)) { | 719 if (state.satisfies(signals)) { |
| 608 if (signals_state) | 720 if (signals_state) |
| 609 *signals_state = state; | 721 *signals_state = state; |
| 610 return MOJO_RESULT_ALREADY_EXISTS; | 722 return MOJO_RESULT_ALREADY_EXISTS; |
| 611 } | 723 } |
| 612 if (!state.can_satisfy(signals)) { | 724 if (!state.can_satisfy(signals)) { |
| 613 if (signals_state) | 725 if (signals_state) |
| 614 *signals_state = state; | 726 *signals_state = state; |
| 615 return MOJO_RESULT_FAILED_PRECONDITION; | 727 return MOJO_RESULT_FAILED_PRECONDITION; |
| (...skipping 30 matching lines...) Expand all Loading... |
| 646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); | 758 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); |
| 647 } | 759 } |
| 648 | 760 |
| 649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | 761 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
| 650 void* destination, | 762 void* destination, |
| 651 size_t* actual_size, | 763 size_t* actual_size, |
| 652 PlatformHandleVector* platform_handles) { | 764 PlatformHandleVector* platform_handles) { |
| 653 CloseImplNoLock(); | 765 CloseImplNoLock(); |
| 654 SerializedMessagePipeHandleDispatcher* serialization = | 766 SerializedMessagePipeHandleDispatcher* serialization = |
| 655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); | 767 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
| 768 serialization->transferable = transferable_; |
| 769 serialization->pipe_id = pipe_id_; |
| 656 if (serialized_platform_handle_.is_valid()) { | 770 if (serialized_platform_handle_.is_valid()) { |
| 657 serialization->platform_handle_index = platform_handles->size(); | 771 serialization->platform_handle_index = platform_handles->size(); |
| 658 platform_handles->push_back(serialized_platform_handle_.release()); | 772 platform_handles->push_back(serialized_platform_handle_.release()); |
| 659 } else { | 773 } else { |
| 660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; | 774 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
| 661 } | 775 } |
| 662 | 776 |
| 663 serialization->write_error = write_error_; | 777 serialization->write_error = write_error_; |
| 664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); | 778 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); |
| 665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); | 779 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 789 break; | 903 break; |
| 790 } | 904 } |
| 791 | 905 |
| 792 if (started_transport_.Try()) { | 906 if (started_transport_.Try()) { |
| 793 base::AutoLock locker(lock()); | 907 base::AutoLock locker(lock()); |
| 794 // We can get two OnError callbacks before the post task below completes. | 908 // We can get two OnError callbacks before the post task below completes. |
| 795 // Although RawChannel still has a pointer to this object until Shutdown is | 909 // Although RawChannel still has a pointer to this object until Shutdown is |
| 796 // called, that is safe since this class always does a PostTask to the IO | 910 // called, that is safe since this class always does a PostTask to the IO |
| 797 // thread to self destruct. | 911 // thread to self destruct. |
| 798 if (channel_ && error != ERROR_WRITE) { | 912 if (channel_ && error != ERROR_WRITE) { |
| 799 channel_->Shutdown(); | 913 if (transferable_) { |
| 914 channel_->Shutdown(); |
| 915 } else { |
| 916 CHECK_NE(non_transferable_state_, CLOSED); |
| 917 // Since we're in a callback from the Broker, call it asynchronously. |
| 918 internal::g_io_thread_task_runner->PostTask( |
| 919 FROM_HERE, |
| 920 base::Bind(&Broker::CloseMessagePipe, |
| 921 base::Unretained(internal::g_broker), pipe_id_, |
| 922 base::Unretained(this))); |
| 923 non_transferable_state_ = CLOSED; |
| 924 } |
| 800 channel_ = nullptr; | 925 channel_ = nullptr; |
| 801 } | 926 } |
| 802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 927 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 803 started_transport_.Release(); | 928 started_transport_.Release(); |
| 804 } else { | 929 } else { |
| 805 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 930 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| 806 } | 931 } |
| 807 } | 932 } |
| 808 | 933 |
| 809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 934 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
| 810 MessageInTransit* message, | 935 MessageInTransit* message, |
| 811 std::vector<DispatcherTransport>* transports) { | 936 std::vector<DispatcherTransport>* transports) { |
| 812 DCHECK(!message->has_dispatchers()); | 937 DCHECK(!message->has_dispatchers()); |
| 813 | 938 |
| 814 // You're not allowed to send either handle to a message pipe over the message | 939 // You're not allowed to send either handle to a message pipe over the message |
| 815 // pipe, so check for this. (The case of trying to write a handle to itself is | 940 // pipe, so check for this. (The case of trying to write a handle to itself is |
| 816 // taken care of by |Core|. That case kind of makes sense, but leads to | 941 // taken care of by |Core|. That case kind of makes sense, but leads to |
| 817 // complications if, e.g., both sides try to do the same thing with their | 942 // complications if, e.g., both sides try to do the same thing with their |
| 818 // respective handles simultaneously. The other case, of trying to write the | 943 // respective handles simultaneously. The other case, of trying to write the |
| 819 // peer handle to a handle, doesn't make sense -- since no handle will be | 944 // peer handle to a handle, doesn't make sense -- since no handle will be |
| 820 // available to read the message from.) | 945 // available to read the message from.) |
| 821 for (size_t i = 0; i < transports->size(); i++) { | 946 for (size_t i = 0; i < transports->size(); i++) { |
| 822 if (!(*transports)[i].is_valid()) | 947 if (!(*transports)[i].is_valid()) |
| 823 continue; | 948 continue; |
| 824 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { | 949 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { |
| 825 MessagePipeDispatcher* mp = | 950 MessagePipeDispatcher* mp = |
| 826 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); | 951 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); |
| 827 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { | 952 if (transferable_ && mp->transferable_ && |
| 953 channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { |
| 828 // The other case should have been disallowed by |Core|. (Note: |port| | 954 // The other case should have been disallowed by |Core|. (Note: |port| |
| 829 // is the peer port of the handle given to |WriteMessage()|.) | 955 // is the peer port of the handle given to |WriteMessage()|.) |
| 830 return MOJO_RESULT_INVALID_ARGUMENT; | 956 return MOJO_RESULT_INVALID_ARGUMENT; |
| 957 } else if (!transferable_ && !mp->transferable_ && |
| 958 pipe_id_ == mp->pipe_id_) { |
| 959 return MOJO_RESULT_INVALID_ARGUMENT; |
| 831 } | 960 } |
| 832 } | 961 } |
| 833 } | 962 } |
| 834 | 963 |
| 835 // Clone the dispatchers and attach them to the message. (This must be done as | 964 // Clone the dispatchers and attach them to the message. (This must be done as |
| 836 // a separate loop, since we want to leave the dispatchers alone on failure.) | 965 // a separate loop, since we want to leave the dispatchers alone on failure.) |
| 837 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | 966 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
| 838 dispatchers->reserve(transports->size()); | 967 dispatchers->reserve(transports->size()); |
| 839 for (size_t i = 0; i < transports->size(); i++) { | 968 for (size_t i = 0; i < transports->size(); i++) { |
| 840 if ((*transports)[i].is_valid()) { | 969 if ((*transports)[i].is_valid()) { |
| 841 dispatchers->push_back( | 970 dispatchers->push_back( |
| 842 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 971 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
| 843 } else { | 972 } else { |
| 844 LOG(WARNING) << "Enqueueing null dispatcher"; | 973 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 845 dispatchers->push_back(nullptr); | 974 dispatchers->push_back(nullptr); |
| 846 } | 975 } |
| 847 } | 976 } |
| 848 message->SetDispatchers(dispatchers.Pass()); | 977 message->SetDispatchers(dispatchers.Pass()); |
| 849 return MOJO_RESULT_OK; | 978 return MOJO_RESULT_OK; |
| 850 } | 979 } |
| 851 | 980 |
| 981 void MessagePipeDispatcher::RequestNontransferableChannel() { |
| 982 lock().AssertAcquired(); |
| 983 CHECK(!transferable_); |
| 984 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
| 985 non_transferable_state_ = CONNECT_CALLED; |
| 986 |
| 987 // PostTask since the broker can call us back synchronously. |
| 988 internal::g_io_thread_task_runner->PostTask( |
| 989 FROM_HERE, |
| 990 base::Bind(&Broker::ConnectMessagePipe, |
| 991 base::Unretained(internal::g_broker), pipe_id_, |
| 992 base::Unretained(this))); |
| 993 } |
| 994 |
| 852 } // namespace edk | 995 } // namespace edk |
| 853 } // namespace mojo | 996 } // namespace mojo |
| OLD | NEW |