OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/logging.h" | 8 #include "base/logging.h" |
9 #include "base/message_loop/message_loop.h" | 9 #include "base/message_loop/message_loop.h" |
10 #include "mojo/edk/embedder/embedder_internal.h" | 10 #include "mojo/edk/embedder/embedder_internal.h" |
11 #include "mojo/edk/embedder/platform_handle_utils.h" | 11 #include "mojo/edk/embedder/platform_handle_utils.h" |
12 #include "mojo/edk/embedder/platform_shared_buffer.h" | 12 #include "mojo/edk/embedder/platform_shared_buffer.h" |
13 #include "mojo/edk/embedder/platform_support.h" | 13 #include "mojo/edk/embedder/platform_support.h" |
14 #include "mojo/edk/system/broker.h" | 14 #include "mojo/edk/system/broker.h" |
15 #include "mojo/edk/system/configuration.h" | 15 #include "mojo/edk/system/configuration.h" |
16 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
17 #include "mojo/edk/system/options_validation.h" | 17 #include "mojo/edk/system/options_validation.h" |
18 #include "mojo/edk/system/transport_data.h" | 18 #include "mojo/edk/system/transport_data.h" |
19 | 19 |
20 namespace mojo { | 20 namespace mojo { |
21 namespace edk { | 21 namespace edk { |
22 | 22 |
23 namespace { | 23 namespace { |
24 | 24 |
25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); | 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
26 | 26 |
27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { | 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
| 28 bool transferable; |
| 29 bool write_error; |
| 30 uint64_t pipe_id; // If transferable is false. |
| 31 // The following members are only set if transferable is true. |
28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP | 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
29 // was closed. | 33 // was closed. |
30 size_t platform_handle_index; | 34 size_t platform_handle_index; |
31 bool write_error; | |
32 | 35 |
33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) | 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
34 uint32_t shared_memory_size; | 37 uint32_t shared_memory_size; |
35 | 38 |
36 size_t serialized_read_buffer_size; | 39 size_t serialized_read_buffer_size; |
37 size_t serialized_write_buffer_size; | 40 size_t serialized_write_buffer_size; |
38 size_t serialized_message_queue_size; | 41 size_t serialized_message_queue_size; |
39 | 42 |
40 // These are the FDs required as part of serializing channel_ and | 43 // These are the FDs required as part of serializing channel_ and |
41 // message_queue_. This is only used on POSIX. | 44 // message_queue_. This is only used on POSIX. |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
83 | 86 |
84 const MojoCreateMessagePipeOptions | 87 const MojoCreateMessagePipeOptions |
85 MessagePipeDispatcher::kDefaultCreateOptions = { | 88 MessagePipeDispatcher::kDefaultCreateOptions = { |
86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
88 | 91 |
89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
90 const MojoCreateMessagePipeOptions* in_options, | 93 const MojoCreateMessagePipeOptions* in_options, |
91 MojoCreateMessagePipeOptions* out_options) { | 94 MojoCreateMessagePipeOptions* out_options) { |
92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; | 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
| 97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; |
94 | 98 |
95 *out_options = kDefaultCreateOptions; | 99 *out_options = kDefaultCreateOptions; |
96 if (!in_options) | 100 if (!in_options) |
97 return MOJO_RESULT_OK; | 101 return MOJO_RESULT_OK; |
98 | 102 |
99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
100 if (!reader.is_valid()) | 104 if (!reader.is_valid()) |
101 return MOJO_RESULT_INVALID_ARGUMENT; | 105 return MOJO_RESULT_INVALID_ARGUMENT; |
102 | 106 |
103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
104 return MOJO_RESULT_OK; | 108 return MOJO_RESULT_OK; |
105 if ((reader.options().flags & ~kKnownFlags)) | 109 if ((reader.options().flags & ~kKnownFlags)) |
106 return MOJO_RESULT_UNIMPLEMENTED; | 110 return MOJO_RESULT_UNIMPLEMENTED; |
107 out_options->flags = reader.options().flags; | 111 out_options->flags = reader.options().flags; |
108 | 112 |
109 // Checks for fields beyond |flags|: | 113 // Checks for fields beyond |flags|: |
110 | 114 |
111 // (Nothing here yet.) | 115 // (Nothing here yet.) |
112 | 116 |
113 return MOJO_RESULT_OK; | 117 return MOJO_RESULT_OK; |
114 } | 118 } |
115 | 119 |
116 void MessagePipeDispatcher::Init( | 120 void MessagePipeDispatcher::Init( |
117 ScopedPlatformHandle message_pipe, | 121 ScopedPlatformHandle message_pipe, |
118 char* serialized_read_buffer, size_t serialized_read_buffer_size, | 122 char* serialized_read_buffer, size_t serialized_read_buffer_size, |
119 char* serialized_write_buffer, size_t serialized_write_buffer_size, | 123 char* serialized_write_buffer, size_t serialized_write_buffer_size, |
120 std::vector<int>* serialized_read_fds, | 124 std::vector<int>* serialized_read_fds, |
121 std::vector<int>* serialized_write_fds) { | 125 std::vector<int>* serialized_write_fds) { |
| 126 CHECK(transferable_); |
122 if (message_pipe.get().is_valid()) { | 127 if (message_pipe.get().is_valid()) { |
123 channel_ = RawChannel::Create(message_pipe.Pass()); | 128 channel_ = RawChannel::Create(message_pipe.Pass()); |
124 | 129 |
125 // TODO(jam): It's probably cleaner to pass this in Init call. | 130 // TODO(jam): It's probably cleaner to pass this in Init call. |
126 channel_->SetSerializedData( | 131 channel_->SetSerializedData( |
127 serialized_read_buffer, serialized_read_buffer_size, | 132 serialized_read_buffer, serialized_read_buffer_size, |
128 serialized_write_buffer, serialized_write_buffer_size, | 133 serialized_write_buffer, serialized_write_buffer_size, |
129 serialized_read_fds, serialized_write_fds); | 134 serialized_read_fds, serialized_write_fds); |
130 internal::g_io_thread_task_runner->PostTask( | 135 internal::g_io_thread_task_runner->PostTask( |
131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
132 } | 137 } |
133 } | 138 } |
134 | 139 |
| 140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { |
| 141 CHECK(!transferable_); |
| 142 pipe_id_ = pipe_id; |
| 143 } |
| 144 |
135 void MessagePipeDispatcher::InitOnIO() { | 145 void MessagePipeDispatcher::InitOnIO() { |
136 base::AutoLock locker(lock()); | 146 base::AutoLock locker(lock()); |
| 147 CHECK(transferable_); |
137 calling_init_ = true; | 148 calling_init_ = true; |
138 if (channel_) | 149 if (channel_) |
139 channel_->Init(this); | 150 channel_->Init(this); |
140 calling_init_ = false; | 151 calling_init_ = false; |
141 } | 152 } |
142 | 153 |
143 void MessagePipeDispatcher::CloseOnIO() { | 154 void MessagePipeDispatcher::CloseOnIO() { |
144 base::AutoLock locker(lock()); | 155 base::AutoLock locker(lock()); |
| 156 if (transferable_) { |
| 157 if (channel_) { |
| 158 channel_->Shutdown(); |
| 159 channel_ = nullptr; |
| 160 } |
| 161 } else { |
| 162 if (non_transferable_state_ == CONNECT_CALLED || |
| 163 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| 164 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 165 RequestNontransferableChannel(); |
145 | 166 |
146 if (channel_) { | 167 // We can't cancel the pending request yet, since the other side of the |
147 channel_->Shutdown(); | 168 // message pipe would want to get pending outgoing messages (if any) or |
148 channel_ = nullptr; | 169 // at least know that this end was closed. So keep this object alive until |
| 170 // then. |
| 171 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
| 172 AddRef(); |
| 173 } else if (non_transferable_state_ == CONNECTED) { |
| 174 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| 175 non_transferable_state_ = CLOSED; |
| 176 channel_ = nullptr; |
| 177 } |
149 } | 178 } |
150 } | 179 } |
151 | 180 |
152 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 181 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
153 return Type::MESSAGE_PIPE; | 182 return Type::MESSAGE_PIPE; |
154 } | 183 } |
155 | 184 |
| 185 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| 186 base::AutoLock locker(lock()); |
| 187 channel_ = channel; |
| 188 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
| 189 channel_->WriteMessage( |
| 190 non_transferable_outgoing_message_queue_.GetMessage()); |
| 191 } |
| 192 |
| 193 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
| 194 // We kept this object alive until it's connected, we can release it now. |
| 195 // Since we're in a callback from the Broker, call it asynchronously. |
| 196 internal::g_io_thread_task_runner->PostTask( |
| 197 FROM_HERE, |
| 198 base::Bind(&Broker::CloseMessagePipe, |
| 199 base::Unretained(internal::g_broker), pipe_id_, |
| 200 base::Unretained(this))); |
| 201 non_transferable_state_ = CLOSED; |
| 202 channel_ = nullptr; |
| 203 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
| 204 } else { |
| 205 non_transferable_state_ = CONNECTED; |
| 206 } |
| 207 } |
| 208 |
156 #if defined(OS_WIN) | 209 #if defined(OS_WIN) |
157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 210 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
158 // best way we want to share this. | 211 // best way we want to share this. |
159 // Since this is used for serialization of messages read/written to a MP that | 212 // Since this is used for serialization of messages read/written to a MP that |
160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 213 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
161 // them when a MP is being sent. As a result, even for POSIX we will probably | 214 // them when a MP is being sent. As a result, even for POSIX we will probably |
162 // want to send the handles to the shell process and exchange them for tokens | 215 // want to send the handles to the shell process and exchange them for tokens |
163 // (since we can be sure that the shell will respond to our IPCs, compared to | 216 // (since we can be sure that the shell will respond to our IPCs, compared to |
164 // the other end where we're sending the MP to, which may not be reading...). | 217 // the other end where we're sending the MP to, which may not be reading...). |
165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | 218 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
(...skipping 13 matching lines...) Expand all Loading... |
179 const void* source, | 232 const void* source, |
180 size_t size, | 233 size_t size, |
181 PlatformHandleVector* platform_handles) { | 234 PlatformHandleVector* platform_handles) { |
182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { | 235 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { |
183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; | 236 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; |
184 return nullptr; | 237 return nullptr; |
185 } | 238 } |
186 | 239 |
187 const SerializedMessagePipeHandleDispatcher* serialization = | 240 const SerializedMessagePipeHandleDispatcher* serialization = |
188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); | 241 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
| 242 |
| 243 scoped_refptr<MessagePipeDispatcher> rv( |
| 244 new MessagePipeDispatcher(serialization->transferable)); |
| 245 if (!rv->transferable_) { |
| 246 rv->InitNonTransferable(serialization->pipe_id); |
| 247 return rv; |
| 248 } |
| 249 |
189 if (serialization->shared_memory_size != | 250 if (serialization->shared_memory_size != |
190 (serialization->serialized_read_buffer_size + | 251 (serialization->serialized_read_buffer_size + |
191 serialization->serialized_write_buffer_size + | 252 serialization->serialized_write_buffer_size + |
192 serialization->serialized_message_queue_size)) { | 253 serialization->serialized_message_queue_size)) { |
193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; | 254 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; |
194 return nullptr; | 255 return nullptr; |
195 } | 256 } |
196 | 257 |
197 ScopedPlatformHandle platform_handle, shared_memory_handle; | 258 ScopedPlatformHandle platform_handle, shared_memory_handle; |
198 if (!GetHandle(serialization->platform_handle_index, | 259 if (!GetHandle(serialization->platform_handle_index, |
(...skipping 27 matching lines...) Expand all Loading... |
226 serialization->serialized_write_buffer_size; | 287 serialization->serialized_write_buffer_size; |
227 buffer += serialized_write_buffer_size; | 288 buffer += serialized_write_buffer_size; |
228 } | 289 } |
229 if (serialization->serialized_message_queue_size) { | 290 if (serialization->serialized_message_queue_size) { |
230 message_queue_data = buffer; | 291 message_queue_data = buffer; |
231 message_queue_size = serialization->serialized_message_queue_size; | 292 message_queue_size = serialization->serialized_message_queue_size; |
232 buffer += message_queue_size; | 293 buffer += message_queue_size; |
233 } | 294 } |
234 } | 295 } |
235 | 296 |
236 scoped_refptr<MessagePipeDispatcher> rv( | |
237 Create(MessagePipeDispatcher::kDefaultCreateOptions)); | |
238 rv->write_error_ = serialization->write_error; | 297 rv->write_error_ = serialization->write_error; |
239 | 298 |
240 std::vector<int> serialized_read_fds; | 299 std::vector<int> serialized_read_fds; |
241 std::vector<int> serialized_write_fds; | 300 std::vector<int> serialized_write_fds; |
242 #if defined(OS_POSIX) | 301 #if defined(OS_POSIX) |
243 std::vector<int> serialized_fds; | 302 std::vector<int> serialized_fds; |
244 size_t serialized_fds_index = 0; | 303 size_t serialized_fds_index = 0; |
245 | 304 |
246 size_t total_fd_count = serialization->serialized_read_fds_length + | 305 size_t total_fd_count = serialization->serialized_read_fds_length + |
247 serialization->serialized_write_fds_length + | 306 serialization->serialized_write_fds_length + |
(...skipping 14 matching lines...) Expand all Loading... |
262 serialized_fds_index += serialization->serialized_read_fds_length; | 321 serialized_fds_index += serialization->serialized_read_fds_length; |
263 serialized_write_fds.assign( | 322 serialized_write_fds.assign( |
264 serialized_fds.begin() + serialized_fds_index, | 323 serialized_fds.begin() + serialized_fds_index, |
265 serialized_fds.begin() + serialized_fds_index + | 324 serialized_fds.begin() + serialized_fds_index + |
266 serialization->serialized_write_fds_length); | 325 serialization->serialized_write_fds_length); |
267 serialized_fds_index += serialization->serialized_write_fds_length; | 326 serialized_fds_index += serialization->serialized_write_fds_length; |
268 #endif | 327 #endif |
269 | 328 |
270 while (message_queue_size) { | 329 while (message_queue_size) { |
271 size_t message_size; | 330 size_t message_size; |
272 CHECK(MessageInTransit::GetNextMessageSize( | 331 if (!MessageInTransit::GetNextMessageSize( |
273 message_queue_data, message_queue_size, &message_size)); | 332 message_queue_data, message_queue_size, &message_size)) { |
| 333 NOTREACHED() << "Couldn't read message size from serialized data."; |
| 334 return nullptr; |
| 335 } |
| 336 if (message_size > message_queue_size) { |
| 337 NOTREACHED() << "Invalid serialized message size."; |
| 338 return nullptr; |
| 339 } |
274 MessageInTransit::View message_view(message_size, message_queue_data); | 340 MessageInTransit::View message_view(message_size, message_queue_data); |
275 message_queue_size -= message_size; | 341 message_queue_size -= message_size; |
276 message_queue_data += message_size; | 342 message_queue_data += message_size; |
277 | 343 |
278 // TODO(jam): Copied below from RawChannelWin. See commment above | 344 // TODO(jam): Copied below from RawChannelWin. See commment above |
279 // GetReadPlatformHandles. | 345 // GetReadPlatformHandles. |
280 ScopedPlatformHandleVectorPtr temp_platform_handles; | 346 ScopedPlatformHandleVectorPtr temp_platform_handles; |
281 if (message_view.transport_data_buffer()) { | 347 if (message_view.transport_data_buffer()) { |
282 size_t num_platform_handles; | 348 size_t num_platform_handles; |
283 const void* platform_handle_table; | 349 const void* platform_handle_table; |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
326 &serialized_write_fds); | 392 &serialized_write_fds); |
327 | 393 |
328 if (message_queue_size) { // Should be empty by now. | 394 if (message_queue_size) { // Should be empty by now. |
329 LOG(ERROR) << "Invalid queued messages"; | 395 LOG(ERROR) << "Invalid queued messages"; |
330 return nullptr; | 396 return nullptr; |
331 } | 397 } |
332 | 398 |
333 return rv; | 399 return rv; |
334 } | 400 } |
335 | 401 |
336 MessagePipeDispatcher::MessagePipeDispatcher() | 402 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
337 : channel_(nullptr), | 403 : channel_(nullptr), |
338 serialized_(false), | |
339 serialized_read_fds_length_(0u), | 404 serialized_read_fds_length_(0u), |
340 serialized_write_fds_length_(0u), | 405 serialized_write_fds_length_(0u), |
341 serialized_message_fds_length_(0u), | 406 serialized_message_fds_length_(0u), |
| 407 pipe_id_(0), |
| 408 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
| 409 serialized_(false), |
342 calling_init_(false), | 410 calling_init_(false), |
343 write_error_(false) { | 411 write_error_(false), |
| 412 transferable_(transferable) { |
344 } | 413 } |
345 | 414 |
346 MessagePipeDispatcher::~MessagePipeDispatcher() { | 415 MessagePipeDispatcher::~MessagePipeDispatcher() { |
347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 416 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
348 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 417 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
349 // down and so when it was deleting pending tasks it caused the last reference | 418 // down and so when it was deleting pending tasks it caused the last reference |
350 // to destruct this object. In that case, safe to destroy the channel. | 419 // to destruct this object. In that case, safe to destroy the channel. |
351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | 420 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
352 channel_->Shutdown(); | 421 channel_->Shutdown(); |
353 else | 422 else |
354 DCHECK(!channel_); | 423 DCHECK(!channel_); |
355 #if defined(OS_POSIX) | 424 #if defined(OS_POSIX) |
356 ClosePlatformHandles(&serialized_fds_); | 425 ClosePlatformHandles(&serialized_fds_); |
357 #endif | 426 #endif |
358 } | 427 } |
359 | 428 |
360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { | 429 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
361 lock().AssertAcquired(); | 430 lock().AssertAcquired(); |
362 awakable_list_.CancelAll(); | 431 awakable_list_.CancelAll(); |
363 } | 432 } |
364 | 433 |
365 void MessagePipeDispatcher::CloseImplNoLock() { | 434 void MessagePipeDispatcher::CloseImplNoLock() { |
366 lock().AssertAcquired(); | 435 lock().AssertAcquired(); |
367 internal::g_io_thread_task_runner->PostTask( | 436 internal::g_io_thread_task_runner->PostTask( |
368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 437 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
369 } | 438 } |
370 | 439 |
371 void MessagePipeDispatcher::SerializeInternal() { | 440 void MessagePipeDispatcher::SerializeInternal() { |
| 441 serialized_ = true; |
| 442 if (!transferable_) { |
| 443 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 444 << "Non transferable message pipe being sent after read/write. " |
| 445 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
| 446 << "the pipe can be sent after it's read or written."; |
| 447 non_transferable_state_ = SERIALISED; |
| 448 return; |
| 449 } |
| 450 |
372 // We need to stop watching handle immediately, even though not on IO thread, | 451 // We need to stop watching handle immediately, even though not on IO thread, |
373 // so that other messages aren't read after this. | 452 // so that other messages aren't read after this. |
374 std::vector<int> serialized_read_fds, serialized_write_fds; | 453 std::vector<int> serialized_read_fds, serialized_write_fds; |
375 if (channel_) { | 454 if (channel_) { |
376 bool write_error = false; | 455 bool write_error = false; |
377 | 456 |
378 serialized_platform_handle_ = channel_->ReleaseHandle( | 457 serialized_platform_handle_ = channel_->ReleaseHandle( |
379 &serialized_read_buffer_, &serialized_write_buffer_, | 458 &serialized_read_buffer_, &serialized_write_buffer_, |
380 &serialized_read_fds, &serialized_write_fds, &write_error); | 459 &serialized_read_fds, &serialized_write_fds, &write_error); |
381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), | 460 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), |
382 serialized_read_fds.end()); | 461 serialized_read_fds.end()); |
383 serialized_read_fds_length_ = serialized_read_fds.size(); | 462 serialized_read_fds_length_ = serialized_read_fds.size(); |
384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), | 463 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), |
385 serialized_write_fds.end()); | 464 serialized_write_fds.end()); |
386 serialized_write_fds_length_ = serialized_write_fds.size(); | 465 serialized_write_fds_length_ = serialized_write_fds.size(); |
387 channel_ = nullptr; | 466 channel_ = nullptr; |
388 if (write_error) | |
389 write_error = true; | |
390 } else { | 467 } else { |
391 // It's valid that the other side wrote some data and closed its end. | 468 // It's valid that the other side wrote some data and closed its end. |
392 } | 469 } |
393 | 470 |
394 DCHECK(serialized_message_queue_.empty()); | 471 DCHECK(serialized_message_queue_.empty()); |
395 while (!message_queue_.IsEmpty()) { | 472 while (!message_queue_.IsEmpty()) { |
396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); | 473 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
397 | 474 |
398 // When MojoWriteMessage is called, the MessageInTransit doesn't have | 475 // When MojoWriteMessage is called, the MessageInTransit doesn't have |
399 // dispatchers set and CreateEquivaent... is called since the dispatchers | 476 // dispatchers set and CreateEquivaent... is called since the dispatchers |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
434 &all_platform_handles->at(0), all_platform_handles->size(), tokens); | 511 &all_platform_handles->at(0), all_platform_handles->size(), tokens); |
435 for (size_t i = 0; i < all_platform_handles->size(); i++) | 512 for (size_t i = 0; i < all_platform_handles->size(); i++) |
436 all_platform_handles->at(i) = PlatformHandle(); | 513 all_platform_handles->at(i) = PlatformHandle(); |
437 #else | 514 #else |
438 for (size_t i = 0; i < all_platform_handles->size(); i++) { | 515 for (size_t i = 0; i < all_platform_handles->size(); i++) { |
439 serialized_fds_.push_back(all_platform_handles->at(i).fd); | 516 serialized_fds_.push_back(all_platform_handles->at(i).fd); |
440 serialized_message_fds_length_++; | 517 serialized_message_fds_length_++; |
441 all_platform_handles->at(i) = PlatformHandle(); | 518 all_platform_handles->at(i) = PlatformHandle(); |
442 } | 519 } |
443 #endif | 520 #endif |
| 521 } |
444 | 522 |
445 serialized_message_queue_.insert( | 523 serialized_message_queue_.insert( |
446 serialized_message_queue_.end(), | 524 serialized_message_queue_.end(), |
447 static_cast<const char*>(message->transport_data()->buffer()), | 525 static_cast<const char*>(message->transport_data()->buffer()), |
448 static_cast<const char*>(message->transport_data()->buffer()) + | 526 static_cast<const char*>(message->transport_data()->buffer()) + |
449 transport_data_buffer_size); | 527 transport_data_buffer_size); |
450 } | |
451 } | 528 } |
452 | 529 |
453 for (size_t i = 0; i < dispatchers.size(); ++i) | 530 for (size_t i = 0; i < dispatchers.size(); ++i) |
454 dispatchers[i]->TransportEnded(); | 531 dispatchers[i]->TransportEnded(); |
455 } | 532 } |
456 | |
457 serialized_ = true; | |
458 } | 533 } |
459 | 534 |
460 scoped_refptr<Dispatcher> | 535 scoped_refptr<Dispatcher> |
461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 536 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
462 lock().AssertAcquired(); | 537 lock().AssertAcquired(); |
463 | 538 |
464 SerializeInternal(); | 539 SerializeInternal(); |
465 | 540 |
466 // TODO(vtl): Currently, there are no options, so we just use | 541 scoped_refptr<MessagePipeDispatcher> rv( |
467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options | 542 new MessagePipeDispatcher(transferable_)); |
468 // too. | |
469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); | |
470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); | |
471 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
472 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
473 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
474 serialized_fds_.swap(rv->serialized_fds_); | |
475 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | |
476 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | |
477 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | |
478 rv->serialized_ = true; | 543 rv->serialized_ = true; |
479 rv->write_error_ = write_error_; | 544 if (transferable_) { |
480 return scoped_refptr<Dispatcher>(rv.get()); | 545 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
| 546 serialized_message_queue_.swap(rv->serialized_message_queue_); |
| 547 serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| 548 serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
| 549 serialized_fds_.swap(rv->serialized_fds_); |
| 550 rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
| 551 rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
| 552 rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
| 553 rv->write_error_ = write_error_; |
| 554 } else { |
| 555 rv->pipe_id_ = pipe_id_; |
| 556 rv->non_transferable_state_ = non_transferable_state_; |
| 557 } |
| 558 return rv; |
481 } | 559 } |
482 | 560 |
483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | 561 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
484 const void* bytes, | 562 const void* bytes, |
485 uint32_t num_bytes, | 563 uint32_t num_bytes, |
486 std::vector<DispatcherTransport>* transports, | 564 std::vector<DispatcherTransport>* transports, |
487 MojoWriteMessageFlags flags) { | 565 MojoWriteMessageFlags flags) { |
| 566 lock().AssertAcquired(); |
488 | 567 |
489 DCHECK(!transports || | 568 DCHECK(!transports || |
490 (transports->size() > 0 && | 569 (transports->size() > 0 && |
491 transports->size() <= GetConfiguration().max_message_num_handles)); | 570 transports->size() <= GetConfiguration().max_message_num_handles)); |
492 | 571 |
493 lock().AssertAcquired(); | 572 if (write_error_ || |
494 | 573 (transferable_ && !channel_) || |
495 if (!channel_ || write_error_) | 574 (!transferable_ && non_transferable_state_ == CLOSED)) { |
496 return MOJO_RESULT_FAILED_PRECONDITION; | 575 return MOJO_RESULT_FAILED_PRECONDITION; |
| 576 } |
497 | 577 |
498 if (num_bytes > GetConfiguration().max_message_num_bytes) | 578 if (num_bytes > GetConfiguration().max_message_num_bytes) |
499 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 579 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
500 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 580 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); | 581 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
502 if (transports) { | 582 if (transports) { |
503 MojoResult result = AttachTransportsNoLock(message.get(), transports); | 583 MojoResult result = AttachTransportsNoLock(message.get(), transports); |
504 if (result != MOJO_RESULT_OK) | 584 if (result != MOJO_RESULT_OK) |
505 return result; | 585 return result; |
506 } | 586 } |
507 | 587 |
508 message->SerializeAndCloseDispatchers(); | 588 message->SerializeAndCloseDispatchers(); |
509 channel_->WriteMessage(message.Pass()); | 589 if (!transferable_) |
| 590 message->set_route_id(pipe_id_); |
| 591 if (!transferable_ && |
| 592 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
| 593 non_transferable_state_ == CONNECT_CALLED)) { |
| 594 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 595 RequestNontransferableChannel(); |
| 596 non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); |
| 597 } else { |
| 598 channel_->WriteMessage(message.Pass()); |
| 599 } |
510 | 600 |
511 return MOJO_RESULT_OK; | 601 return MOJO_RESULT_OK; |
512 } | 602 } |
513 | 603 |
514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | 604 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
515 void* bytes, | 605 void* bytes, |
516 uint32_t* num_bytes, | 606 uint32_t* num_bytes, |
517 DispatcherVector* dispatchers, | 607 DispatcherVector* dispatchers, |
518 uint32_t* num_dispatchers, | 608 uint32_t* num_dispatchers, |
519 MojoReadMessageFlags flags) { | 609 MojoReadMessageFlags flags) { |
520 lock().AssertAcquired(); | 610 lock().AssertAcquired(); |
521 if (channel_) | 611 if (channel_) { |
522 channel_->EnsureLazyInitialized(); | 612 channel_->EnsureLazyInitialized(); |
| 613 } else if (!transferable_) { |
| 614 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| 615 RequestNontransferableChannel(); |
| 616 return MOJO_RESULT_SHOULD_WAIT; |
| 617 } else if (non_transferable_state_ == CONNECT_CALLED) { |
| 618 return MOJO_RESULT_SHOULD_WAIT; |
| 619 } |
| 620 } |
| 621 |
523 DCHECK(!dispatchers || dispatchers->empty()); | 622 DCHECK(!dispatchers || dispatchers->empty()); |
524 | 623 |
525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; | 624 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; | 625 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
527 | 626 |
528 if (message_queue_.IsEmpty()) | 627 if (message_queue_.IsEmpty()) |
529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; | 628 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; |
530 | 629 |
531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | 630 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
532 // and release the lock immediately. | 631 // and release the lock immediately. |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
576 return MOJO_RESULT_OK; | 675 return MOJO_RESULT_OK; |
577 } | 676 } |
578 | 677 |
579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | 678 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
580 const { | 679 const { |
581 lock().AssertAcquired(); | 680 lock().AssertAcquired(); |
582 | 681 |
583 HandleSignalsState rv; | 682 HandleSignalsState rv; |
584 if (!message_queue_.IsEmpty()) | 683 if (!message_queue_.IsEmpty()) |
585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 684 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
586 if (channel_ || !message_queue_.IsEmpty()) | 685 if (!message_queue_.IsEmpty() || |
| 686 (transferable_ && channel_) || |
| 687 (!transferable_ && non_transferable_state_ != CLOSED)) |
587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 688 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
588 if (channel_ && !write_error_) { | 689 if (!write_error_ && |
| 690 ((transferable_ && channel_) || |
| 691 (!transferable_ && non_transferable_state_ != CLOSED))) { |
589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 692 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 693 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
591 } | 694 } |
592 if (!channel_ || write_error_) | 695 if (write_error_ || |
| 696 (transferable_ && !channel_) || |
| 697 (!transferable_ && |
| 698 ((non_transferable_state_ == CLOSED) || is_closed()))) { |
593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 699 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 700 } |
594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 701 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
595 return rv; | 702 return rv; |
596 } | 703 } |
597 | 704 |
598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( | 705 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
599 Awakable* awakable, | 706 Awakable* awakable, |
600 MojoHandleSignals signals, | 707 MojoHandleSignals signals, |
601 uintptr_t context, | 708 uintptr_t context, |
602 HandleSignalsState* signals_state) { | 709 HandleSignalsState* signals_state) { |
603 lock().AssertAcquired(); | 710 lock().AssertAcquired(); |
604 if (channel_) | 711 if (channel_) { |
605 channel_->EnsureLazyInitialized(); | 712 channel_->EnsureLazyInitialized(); |
| 713 } else if (!transferable_ && |
| 714 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| 715 RequestNontransferableChannel(); |
| 716 } |
| 717 |
606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 718 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
607 if (state.satisfies(signals)) { | 719 if (state.satisfies(signals)) { |
608 if (signals_state) | 720 if (signals_state) |
609 *signals_state = state; | 721 *signals_state = state; |
610 return MOJO_RESULT_ALREADY_EXISTS; | 722 return MOJO_RESULT_ALREADY_EXISTS; |
611 } | 723 } |
612 if (!state.can_satisfy(signals)) { | 724 if (!state.can_satisfy(signals)) { |
613 if (signals_state) | 725 if (signals_state) |
614 *signals_state = state; | 726 *signals_state = state; |
615 return MOJO_RESULT_FAILED_PRECONDITION; | 727 return MOJO_RESULT_FAILED_PRECONDITION; |
(...skipping 30 matching lines...) Expand all Loading... |
646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); | 758 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); |
647 } | 759 } |
648 | 760 |
649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | 761 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
650 void* destination, | 762 void* destination, |
651 size_t* actual_size, | 763 size_t* actual_size, |
652 PlatformHandleVector* platform_handles) { | 764 PlatformHandleVector* platform_handles) { |
653 CloseImplNoLock(); | 765 CloseImplNoLock(); |
654 SerializedMessagePipeHandleDispatcher* serialization = | 766 SerializedMessagePipeHandleDispatcher* serialization = |
655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); | 767 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
| 768 serialization->transferable = transferable_; |
| 769 serialization->pipe_id = pipe_id_; |
656 if (serialized_platform_handle_.is_valid()) { | 770 if (serialized_platform_handle_.is_valid()) { |
657 serialization->platform_handle_index = platform_handles->size(); | 771 serialization->platform_handle_index = platform_handles->size(); |
658 platform_handles->push_back(serialized_platform_handle_.release()); | 772 platform_handles->push_back(serialized_platform_handle_.release()); |
659 } else { | 773 } else { |
660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; | 774 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
661 } | 775 } |
662 | 776 |
663 serialization->write_error = write_error_; | 777 serialization->write_error = write_error_; |
664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); | 778 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); |
665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); | 779 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); |
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
789 break; | 903 break; |
790 } | 904 } |
791 | 905 |
792 if (started_transport_.Try()) { | 906 if (started_transport_.Try()) { |
793 base::AutoLock locker(lock()); | 907 base::AutoLock locker(lock()); |
794 // We can get two OnError callbacks before the post task below completes. | 908 // We can get two OnError callbacks before the post task below completes. |
795 // Although RawChannel still has a pointer to this object until Shutdown is | 909 // Although RawChannel still has a pointer to this object until Shutdown is |
796 // called, that is safe since this class always does a PostTask to the IO | 910 // called, that is safe since this class always does a PostTask to the IO |
797 // thread to self destruct. | 911 // thread to self destruct. |
798 if (channel_ && error != ERROR_WRITE) { | 912 if (channel_ && error != ERROR_WRITE) { |
799 channel_->Shutdown(); | 913 if (transferable_) { |
| 914 channel_->Shutdown(); |
| 915 } else { |
| 916 CHECK_NE(non_transferable_state_, CLOSED); |
| 917 // Since we're in a callback from the Broker, call it asynchronously. |
| 918 internal::g_io_thread_task_runner->PostTask( |
| 919 FROM_HERE, |
| 920 base::Bind(&Broker::CloseMessagePipe, |
| 921 base::Unretained(internal::g_broker), pipe_id_, |
| 922 base::Unretained(this))); |
| 923 non_transferable_state_ = CLOSED; |
| 924 } |
800 channel_ = nullptr; | 925 channel_ = nullptr; |
801 } | 926 } |
802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 927 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
803 started_transport_.Release(); | 928 started_transport_.Release(); |
804 } else { | 929 } else { |
805 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 930 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
806 } | 931 } |
807 } | 932 } |
808 | 933 |
809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 934 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
810 MessageInTransit* message, | 935 MessageInTransit* message, |
811 std::vector<DispatcherTransport>* transports) { | 936 std::vector<DispatcherTransport>* transports) { |
812 DCHECK(!message->has_dispatchers()); | 937 DCHECK(!message->has_dispatchers()); |
813 | 938 |
814 // You're not allowed to send either handle to a message pipe over the message | 939 // You're not allowed to send either handle to a message pipe over the message |
815 // pipe, so check for this. (The case of trying to write a handle to itself is | 940 // pipe, so check for this. (The case of trying to write a handle to itself is |
816 // taken care of by |Core|. That case kind of makes sense, but leads to | 941 // taken care of by |Core|. That case kind of makes sense, but leads to |
817 // complications if, e.g., both sides try to do the same thing with their | 942 // complications if, e.g., both sides try to do the same thing with their |
818 // respective handles simultaneously. The other case, of trying to write the | 943 // respective handles simultaneously. The other case, of trying to write the |
819 // peer handle to a handle, doesn't make sense -- since no handle will be | 944 // peer handle to a handle, doesn't make sense -- since no handle will be |
820 // available to read the message from.) | 945 // available to read the message from.) |
821 for (size_t i = 0; i < transports->size(); i++) { | 946 for (size_t i = 0; i < transports->size(); i++) { |
822 if (!(*transports)[i].is_valid()) | 947 if (!(*transports)[i].is_valid()) |
823 continue; | 948 continue; |
824 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { | 949 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { |
825 MessagePipeDispatcher* mp = | 950 MessagePipeDispatcher* mp = |
826 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); | 951 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); |
827 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { | 952 if (transferable_ && mp->transferable_ && |
| 953 channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { |
828 // The other case should have been disallowed by |Core|. (Note: |port| | 954 // The other case should have been disallowed by |Core|. (Note: |port| |
829 // is the peer port of the handle given to |WriteMessage()|.) | 955 // is the peer port of the handle given to |WriteMessage()|.) |
830 return MOJO_RESULT_INVALID_ARGUMENT; | 956 return MOJO_RESULT_INVALID_ARGUMENT; |
| 957 } else if (!transferable_ && !mp->transferable_ && |
| 958 pipe_id_ == mp->pipe_id_) { |
| 959 return MOJO_RESULT_INVALID_ARGUMENT; |
831 } | 960 } |
832 } | 961 } |
833 } | 962 } |
834 | 963 |
835 // Clone the dispatchers and attach them to the message. (This must be done as | 964 // Clone the dispatchers and attach them to the message. (This must be done as |
836 // a separate loop, since we want to leave the dispatchers alone on failure.) | 965 // a separate loop, since we want to leave the dispatchers alone on failure.) |
837 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | 966 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
838 dispatchers->reserve(transports->size()); | 967 dispatchers->reserve(transports->size()); |
839 for (size_t i = 0; i < transports->size(); i++) { | 968 for (size_t i = 0; i < transports->size(); i++) { |
840 if ((*transports)[i].is_valid()) { | 969 if ((*transports)[i].is_valid()) { |
841 dispatchers->push_back( | 970 dispatchers->push_back( |
842 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 971 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
843 } else { | 972 } else { |
844 LOG(WARNING) << "Enqueueing null dispatcher"; | 973 LOG(WARNING) << "Enqueueing null dispatcher"; |
845 dispatchers->push_back(nullptr); | 974 dispatchers->push_back(nullptr); |
846 } | 975 } |
847 } | 976 } |
848 message->SetDispatchers(dispatchers.Pass()); | 977 message->SetDispatchers(dispatchers.Pass()); |
849 return MOJO_RESULT_OK; | 978 return MOJO_RESULT_OK; |
850 } | 979 } |
851 | 980 |
| 981 void MessagePipeDispatcher::RequestNontransferableChannel() { |
| 982 lock().AssertAcquired(); |
| 983 CHECK(!transferable_); |
| 984 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
| 985 non_transferable_state_ = CONNECT_CALLED; |
| 986 |
| 987 // PostTask since the broker can call us back synchronously. |
| 988 internal::g_io_thread_task_runner->PostTask( |
| 989 FROM_HERE, |
| 990 base::Bind(&Broker::ConnectMessagePipe, |
| 991 base::Unretained(internal::g_broker), pipe_id_, |
| 992 base::Unretained(this))); |
| 993 } |
| 994 |
852 } // namespace edk | 995 } // namespace edk |
853 } // namespace mojo | 996 } // namespace mojo |
OLD | NEW |