OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/logging.h" | 8 #include "base/logging.h" |
9 #include "base/message_loop/message_loop.h" | 9 #include "base/message_loop/message_loop.h" |
10 #include "mojo/edk/embedder/embedder_internal.h" | 10 #include "mojo/edk/embedder/embedder_internal.h" |
11 #include "mojo/edk/embedder/platform_handle_utils.h" | 11 #include "mojo/edk/embedder/platform_handle_utils.h" |
12 #include "mojo/edk/embedder/platform_shared_buffer.h" | 12 #include "mojo/edk/embedder/platform_shared_buffer.h" |
13 #include "mojo/edk/embedder/platform_support.h" | 13 #include "mojo/edk/embedder/platform_support.h" |
14 #include "mojo/edk/system/broker.h" | 14 #include "mojo/edk/system/broker.h" |
15 #include "mojo/edk/system/configuration.h" | 15 #include "mojo/edk/system/configuration.h" |
16 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
17 #include "mojo/edk/system/options_validation.h" | 17 #include "mojo/edk/system/options_validation.h" |
18 #include "mojo/edk/system/transport_data.h" | 18 #include "mojo/edk/system/transport_data.h" |
19 | 19 |
20 namespace mojo { | 20 namespace mojo { |
21 namespace edk { | 21 namespace edk { |
22 | 22 |
23 namespace { | 23 namespace { |
24 | 24 |
25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); | 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
26 | 26 |
27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { | 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
| 28 bool transferable; |
| 29 bool write_error; |
| 30 uint64_t pipe_id; // If transferable is false. |
| 31 // The following members are only set if transferable is true. |
28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP | 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
29 // was closed. | 33 // was closed. |
30 size_t platform_handle_index; | 34 size_t platform_handle_index; |
31 bool write_error; | |
32 | 35 |
33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) | 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
34 uint32_t shared_memory_size; | 37 uint32_t shared_memory_size; |
35 | 38 |
36 size_t serialized_read_buffer_size; | 39 size_t serialized_read_buffer_size; |
37 size_t serialized_write_buffer_size; | 40 size_t serialized_write_buffer_size; |
38 size_t serialized_message_queue_size; | 41 size_t serialized_message_queue_size; |
39 | 42 |
40 // These are the FDs required as part of serializing channel_ and | 43 // These are the FDs required as part of serializing channel_ and |
41 // message_queue_. This is only used on POSIX. | 44 // message_queue_. This is only used on POSIX. |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
83 | 86 |
84 const MojoCreateMessagePipeOptions | 87 const MojoCreateMessagePipeOptions |
85 MessagePipeDispatcher::kDefaultCreateOptions = { | 88 MessagePipeDispatcher::kDefaultCreateOptions = { |
86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
88 | 91 |
89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
90 const MojoCreateMessagePipeOptions* in_options, | 93 const MojoCreateMessagePipeOptions* in_options, |
91 MojoCreateMessagePipeOptions* out_options) { | 94 MojoCreateMessagePipeOptions* out_options) { |
92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; | 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
| 97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; |
94 | 98 |
95 *out_options = kDefaultCreateOptions; | 99 *out_options = kDefaultCreateOptions; |
96 if (!in_options) | 100 if (!in_options) |
97 return MOJO_RESULT_OK; | 101 return MOJO_RESULT_OK; |
98 | 102 |
99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
100 if (!reader.is_valid()) | 104 if (!reader.is_valid()) |
101 return MOJO_RESULT_INVALID_ARGUMENT; | 105 return MOJO_RESULT_INVALID_ARGUMENT; |
102 | 106 |
103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
104 return MOJO_RESULT_OK; | 108 return MOJO_RESULT_OK; |
105 if ((reader.options().flags & ~kKnownFlags)) | 109 if ((reader.options().flags & ~kKnownFlags)) |
106 return MOJO_RESULT_UNIMPLEMENTED; | 110 return MOJO_RESULT_UNIMPLEMENTED; |
107 out_options->flags = reader.options().flags; | 111 out_options->flags = reader.options().flags; |
108 | 112 |
109 // Checks for fields beyond |flags|: | 113 // Checks for fields beyond |flags|: |
110 | 114 |
111 // (Nothing here yet.) | 115 // (Nothing here yet.) |
112 | 116 |
113 return MOJO_RESULT_OK; | 117 return MOJO_RESULT_OK; |
114 } | 118 } |
115 | 119 |
116 void MessagePipeDispatcher::Init( | 120 void MessagePipeDispatcher::Init( |
117 ScopedPlatformHandle message_pipe, | 121 ScopedPlatformHandle message_pipe, |
118 char* serialized_read_buffer, size_t serialized_read_buffer_size, | 122 char* serialized_read_buffer, size_t serialized_read_buffer_size, |
119 char* serialized_write_buffer, size_t serialized_write_buffer_size, | 123 char* serialized_write_buffer, size_t serialized_write_buffer_size, |
120 std::vector<int>* serialized_read_fds, | 124 std::vector<int>* serialized_read_fds, |
121 std::vector<int>* serialized_write_fds) { | 125 std::vector<int>* serialized_write_fds) { |
| 126 CHECK(transferable_); |
122 if (message_pipe.get().is_valid()) { | 127 if (message_pipe.get().is_valid()) { |
123 channel_ = RawChannel::Create(message_pipe.Pass()); | 128 channel_ = RawChannel::Create(message_pipe.Pass()); |
124 | 129 |
125 // TODO(jam): It's probably cleaner to pass this in Init call. | 130 // TODO(jam): It's probably cleaner to pass this in Init call. |
126 channel_->SetSerializedData( | 131 channel_->SetSerializedData( |
127 serialized_read_buffer, serialized_read_buffer_size, | 132 serialized_read_buffer, serialized_read_buffer_size, |
128 serialized_write_buffer, serialized_write_buffer_size, | 133 serialized_write_buffer, serialized_write_buffer_size, |
129 serialized_read_fds, serialized_write_fds); | 134 serialized_read_fds, serialized_write_fds); |
130 internal::g_io_thread_task_runner->PostTask( | 135 internal::g_io_thread_task_runner->PostTask( |
131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
132 } | 137 } |
133 } | 138 } |
134 | 139 |
| 140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { |
| 141 CHECK(!transferable_); |
| 142 pipe_id_ = pipe_id; |
| 143 } |
| 144 |
135 void MessagePipeDispatcher::InitOnIO() { | 145 void MessagePipeDispatcher::InitOnIO() { |
136 base::AutoLock locker(lock()); | 146 base::AutoLock locker(lock()); |
| 147 CHECK(transferable_); |
137 calling_init_ = true; | 148 calling_init_ = true; |
138 if (channel_) | 149 if (channel_) |
139 channel_->Init(this); | 150 channel_->Init(this); |
140 calling_init_ = false; | 151 calling_init_ = false; |
141 } | 152 } |
142 | 153 |
143 void MessagePipeDispatcher::CloseOnIO() { | 154 void MessagePipeDispatcher::CloseOnIO() { |
144 base::AutoLock locker(lock()); | 155 base::AutoLock locker(lock()); |
145 | 156 if (transferable_) { |
146 if (channel_) { | 157 if (channel_) { |
147 channel_->Shutdown(); | 158 channel_->Shutdown(); |
148 channel_ = nullptr; | 159 channel_ = nullptr; |
| 160 } |
| 161 } else { |
| 162 if (non_transferable_state_ == CONNECT_CALLED) { |
| 163 // We can't cancel the pending request yet, since the other side of the |
| 164 // message pipe would want to get pending outgoing messages (if any) or |
| 165 // at least know that this end was closed. So keep this object alive until |
| 166 // then. |
| 167 AddRef(); |
| 168 } else if (non_transferable_state_ == CONNECTED) { |
| 169 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| 170 non_transferable_state_ = CLOSED; |
| 171 channel_ = nullptr; |
| 172 } |
149 } | 173 } |
150 } | 174 } |
151 | 175 |
152 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 176 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
153 return Type::MESSAGE_PIPE; | 177 return Type::MESSAGE_PIPE; |
154 } | 178 } |
155 | 179 |
| 180 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| 181 base::AutoLock locker(lock()); |
| 182 channel_ = channel; |
| 183 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
| 184 channel_->WriteMessage( |
| 185 non_transferable_outgoing_message_queue_.GetMessage()); |
| 186 } |
| 187 |
| 188 if (is_closed()) { |
| 189 CHECK_EQ(non_transferable_state_, CONNECT_CALLED); |
| 190 // We kept this object alive until it's connected, we can release it now. |
| 191 // Since we're in a callback from the Broker, call it asynchronously. |
| 192 internal::g_io_thread_task_runner->PostTask( |
| 193 FROM_HERE, |
| 194 base::Bind(&Broker::CloseMessagePipe, |
| 195 base::Unretained(internal::g_broker), pipe_id_, |
| 196 base::Unretained(this))); |
| 197 non_transferable_state_ = CLOSED; |
| 198 channel_ = nullptr; |
| 199 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
| 200 } else { |
| 201 non_transferable_state_ = CONNECTED; |
| 202 } |
| 203 } |
| 204 |
156 #if defined(OS_WIN) | 205 #if defined(OS_WIN) |
157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 206 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
158 // best way we want to share this. | 207 // best way we want to share this. |
159 // Since this is used for serialization of messages read/written to a MP that | 208 // Since this is used for serialization of messages read/written to a MP that |
160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 209 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
161 // them when a MP is being sent. As a result, even for POSIX we will probably | 210 // them when a MP is being sent. As a result, even for POSIX we will probably |
162 // want to send the handles to the shell process and exchange them for tokens | 211 // want to send the handles to the shell process and exchange them for tokens |
163 // (since we can be sure that the shell will respond to our IPCs, compared to | 212 // (since we can be sure that the shell will respond to our IPCs, compared to |
164 // the other end where we're sending the MP to, which may not be reading...). | 213 // the other end where we're sending the MP to, which may not be reading...). |
165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | 214 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
(...skipping 13 matching lines...) Expand all Loading... |
179 const void* source, | 228 const void* source, |
180 size_t size, | 229 size_t size, |
181 PlatformHandleVector* platform_handles) { | 230 PlatformHandleVector* platform_handles) { |
182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { | 231 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { |
183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; | 232 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; |
184 return nullptr; | 233 return nullptr; |
185 } | 234 } |
186 | 235 |
187 const SerializedMessagePipeHandleDispatcher* serialization = | 236 const SerializedMessagePipeHandleDispatcher* serialization = |
188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); | 237 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
| 238 |
| 239 scoped_refptr<MessagePipeDispatcher> rv( |
| 240 new MessagePipeDispatcher(serialization->transferable)); |
| 241 if (!rv->transferable_) { |
| 242 rv->InitNonTransferable(serialization->pipe_id); |
| 243 return rv; |
| 244 } |
| 245 |
189 if (serialization->shared_memory_size != | 246 if (serialization->shared_memory_size != |
190 (serialization->serialized_read_buffer_size + | 247 (serialization->serialized_read_buffer_size + |
191 serialization->serialized_write_buffer_size + | 248 serialization->serialized_write_buffer_size + |
192 serialization->serialized_message_queue_size)) { | 249 serialization->serialized_message_queue_size)) { |
193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; | 250 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; |
194 return nullptr; | 251 return nullptr; |
195 } | 252 } |
196 | 253 |
197 ScopedPlatformHandle platform_handle, shared_memory_handle; | 254 ScopedPlatformHandle platform_handle, shared_memory_handle; |
198 if (!GetHandle(serialization->platform_handle_index, | 255 if (!GetHandle(serialization->platform_handle_index, |
(...skipping 27 matching lines...) Expand all Loading... |
226 serialization->serialized_write_buffer_size; | 283 serialization->serialized_write_buffer_size; |
227 buffer += serialized_write_buffer_size; | 284 buffer += serialized_write_buffer_size; |
228 } | 285 } |
229 if (serialization->serialized_message_queue_size) { | 286 if (serialization->serialized_message_queue_size) { |
230 message_queue_data = buffer; | 287 message_queue_data = buffer; |
231 message_queue_size = serialization->serialized_message_queue_size; | 288 message_queue_size = serialization->serialized_message_queue_size; |
232 buffer += message_queue_size; | 289 buffer += message_queue_size; |
233 } | 290 } |
234 } | 291 } |
235 | 292 |
236 scoped_refptr<MessagePipeDispatcher> rv( | |
237 Create(MessagePipeDispatcher::kDefaultCreateOptions)); | |
238 rv->write_error_ = serialization->write_error; | 293 rv->write_error_ = serialization->write_error; |
239 | 294 |
240 std::vector<int> serialized_read_fds; | 295 std::vector<int> serialized_read_fds; |
241 std::vector<int> serialized_write_fds; | 296 std::vector<int> serialized_write_fds; |
242 #if defined(OS_POSIX) | 297 #if defined(OS_POSIX) |
243 std::vector<int> serialized_fds; | 298 std::vector<int> serialized_fds; |
244 size_t serialized_fds_index = 0; | 299 size_t serialized_fds_index = 0; |
245 | 300 |
246 size_t total_fd_count = serialization->serialized_read_fds_length + | 301 size_t total_fd_count = serialization->serialized_read_fds_length + |
247 serialization->serialized_write_fds_length + | 302 serialization->serialized_write_fds_length + |
(...skipping 14 matching lines...) Expand all Loading... |
262 serialized_fds_index += serialization->serialized_read_fds_length; | 317 serialized_fds_index += serialization->serialized_read_fds_length; |
263 serialized_write_fds.assign( | 318 serialized_write_fds.assign( |
264 serialized_fds.begin() + serialized_fds_index, | 319 serialized_fds.begin() + serialized_fds_index, |
265 serialized_fds.begin() + serialized_fds_index + | 320 serialized_fds.begin() + serialized_fds_index + |
266 serialization->serialized_write_fds_length); | 321 serialization->serialized_write_fds_length); |
267 serialized_fds_index += serialization->serialized_write_fds_length; | 322 serialized_fds_index += serialization->serialized_write_fds_length; |
268 #endif | 323 #endif |
269 | 324 |
270 while (message_queue_size) { | 325 while (message_queue_size) { |
271 size_t message_size; | 326 size_t message_size; |
272 CHECK(MessageInTransit::GetNextMessageSize( | 327 if (!MessageInTransit::GetNextMessageSize( |
273 message_queue_data, message_queue_size, &message_size)); | 328 message_queue_data, message_queue_size, &message_size)) { |
| 329 NOTREACHED() << "Couldn't read message size from serialized data."; |
| 330 return nullptr; |
| 331 } |
| 332 if (message_size > message_queue_size) { |
| 333 NOTREACHED() << "Invalid serialized message size."; |
| 334 return nullptr; |
| 335 } |
274 MessageInTransit::View message_view(message_size, message_queue_data); | 336 MessageInTransit::View message_view(message_size, message_queue_data); |
275 message_queue_size -= message_size; | 337 message_queue_size -= message_size; |
276 message_queue_data += message_size; | 338 message_queue_data += message_size; |
277 | 339 |
278 // TODO(jam): Copied below from RawChannelWin. See commment above | 340 // TODO(jam): Copied below from RawChannelWin. See commment above |
279 // GetReadPlatformHandles. | 341 // GetReadPlatformHandles. |
280 ScopedPlatformHandleVectorPtr temp_platform_handles; | 342 ScopedPlatformHandleVectorPtr temp_platform_handles; |
281 if (message_view.transport_data_buffer()) { | 343 if (message_view.transport_data_buffer()) { |
282 size_t num_platform_handles; | 344 size_t num_platform_handles; |
283 const void* platform_handle_table; | 345 const void* platform_handle_table; |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
326 &serialized_write_fds); | 388 &serialized_write_fds); |
327 | 389 |
328 if (message_queue_size) { // Should be empty by now. | 390 if (message_queue_size) { // Should be empty by now. |
329 LOG(ERROR) << "Invalid queued messages"; | 391 LOG(ERROR) << "Invalid queued messages"; |
330 return nullptr; | 392 return nullptr; |
331 } | 393 } |
332 | 394 |
333 return rv; | 395 return rv; |
334 } | 396 } |
335 | 397 |
336 MessagePipeDispatcher::MessagePipeDispatcher() | 398 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
337 : channel_(nullptr), | 399 : channel_(nullptr), |
338 serialized_(false), | |
339 serialized_read_fds_length_(0u), | 400 serialized_read_fds_length_(0u), |
340 serialized_write_fds_length_(0u), | 401 serialized_write_fds_length_(0u), |
341 serialized_message_fds_length_(0u), | 402 serialized_message_fds_length_(0u), |
| 403 pipe_id_(0), |
| 404 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
| 405 serialized_(false), |
342 calling_init_(false), | 406 calling_init_(false), |
343 write_error_(false) { | 407 write_error_(false), |
| 408 transferable_(transferable) { |
344 } | 409 } |
345 | 410 |
346 MessagePipeDispatcher::~MessagePipeDispatcher() { | 411 MessagePipeDispatcher::~MessagePipeDispatcher() { |
347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 412 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
348 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 413 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
349 // down and so when it was deleting pending tasks it caused the last reference | 414 // down and so when it was deleting pending tasks it caused the last reference |
350 // to destruct this object. In that case, safe to destroy the channel. | 415 // to destruct this object. In that case, safe to destroy the channel. |
351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | 416 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
352 channel_->Shutdown(); | 417 channel_->Shutdown(); |
353 else | 418 else |
354 DCHECK(!channel_); | 419 DCHECK(!channel_); |
355 #if defined(OS_POSIX) | 420 #if defined(OS_POSIX) |
356 ClosePlatformHandles(&serialized_fds_); | 421 ClosePlatformHandles(&serialized_fds_); |
357 #endif | 422 #endif |
358 } | 423 } |
359 | 424 |
360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { | 425 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
361 lock().AssertAcquired(); | 426 lock().AssertAcquired(); |
362 awakable_list_.CancelAll(); | 427 awakable_list_.CancelAll(); |
363 } | 428 } |
364 | 429 |
365 void MessagePipeDispatcher::CloseImplNoLock() { | 430 void MessagePipeDispatcher::CloseImplNoLock() { |
366 lock().AssertAcquired(); | 431 lock().AssertAcquired(); |
367 internal::g_io_thread_task_runner->PostTask( | 432 internal::g_io_thread_task_runner->PostTask( |
368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 433 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
369 } | 434 } |
370 | 435 |
371 void MessagePipeDispatcher::SerializeInternal() { | 436 void MessagePipeDispatcher::SerializeInternal() { |
| 437 serialized_ = true; |
| 438 if (!transferable_) { |
| 439 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 440 << "Non transferable message pipe being sent after read/write. " |
| 441 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
| 442 << "the pipe can be sent after it's read or written."; |
| 443 return; |
| 444 } |
| 445 |
372 // We need to stop watching handle immediately, even though not on IO thread, | 446 // We need to stop watching handle immediately, even though not on IO thread, |
373 // so that other messages aren't read after this. | 447 // so that other messages aren't read after this. |
374 std::vector<int> serialized_read_fds, serialized_write_fds; | 448 std::vector<int> serialized_read_fds, serialized_write_fds; |
375 if (channel_) { | 449 if (channel_) { |
376 bool write_error = false; | 450 bool write_error = false; |
377 | 451 |
378 serialized_platform_handle_ = channel_->ReleaseHandle( | 452 serialized_platform_handle_ = channel_->ReleaseHandle( |
379 &serialized_read_buffer_, &serialized_write_buffer_, | 453 &serialized_read_buffer_, &serialized_write_buffer_, |
380 &serialized_read_fds, &serialized_write_fds, &write_error); | 454 &serialized_read_fds, &serialized_write_fds, &write_error); |
381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), | 455 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), |
382 serialized_read_fds.end()); | 456 serialized_read_fds.end()); |
383 serialized_read_fds_length_ = serialized_read_fds.size(); | 457 serialized_read_fds_length_ = serialized_read_fds.size(); |
384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), | 458 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), |
385 serialized_write_fds.end()); | 459 serialized_write_fds.end()); |
386 serialized_write_fds_length_ = serialized_write_fds.size(); | 460 serialized_write_fds_length_ = serialized_write_fds.size(); |
387 channel_ = nullptr; | 461 channel_ = nullptr; |
388 if (write_error) | |
389 write_error = true; | |
390 } else { | 462 } else { |
391 // It's valid that the other side wrote some data and closed its end. | 463 // It's valid that the other side wrote some data and closed its end. |
392 } | 464 } |
393 | 465 |
394 DCHECK(serialized_message_queue_.empty()); | 466 DCHECK(serialized_message_queue_.empty()); |
395 while (!message_queue_.IsEmpty()) { | 467 while (!message_queue_.IsEmpty()) { |
396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); | 468 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
397 | 469 |
398 // When MojoWriteMessage is called, the MessageInTransit doesn't have | 470 // When MojoWriteMessage is called, the MessageInTransit doesn't have |
399 // dispatchers set and CreateEquivaent... is called since the dispatchers | 471 // dispatchers set and CreateEquivaent... is called since the dispatchers |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
434 &all_platform_handles->at(0), all_platform_handles->size(), tokens); | 506 &all_platform_handles->at(0), all_platform_handles->size(), tokens); |
435 for (size_t i = 0; i < all_platform_handles->size(); i++) | 507 for (size_t i = 0; i < all_platform_handles->size(); i++) |
436 all_platform_handles->at(i) = PlatformHandle(); | 508 all_platform_handles->at(i) = PlatformHandle(); |
437 #else | 509 #else |
438 for (size_t i = 0; i < all_platform_handles->size(); i++) { | 510 for (size_t i = 0; i < all_platform_handles->size(); i++) { |
439 serialized_fds_.push_back(all_platform_handles->at(i).fd); | 511 serialized_fds_.push_back(all_platform_handles->at(i).fd); |
440 serialized_message_fds_length_++; | 512 serialized_message_fds_length_++; |
441 all_platform_handles->at(i) = PlatformHandle(); | 513 all_platform_handles->at(i) = PlatformHandle(); |
442 } | 514 } |
443 #endif | 515 #endif |
| 516 } |
444 | 517 |
445 serialized_message_queue_.insert( | 518 serialized_message_queue_.insert( |
446 serialized_message_queue_.end(), | 519 serialized_message_queue_.end(), |
447 static_cast<const char*>(message->transport_data()->buffer()), | 520 static_cast<const char*>(message->transport_data()->buffer()), |
448 static_cast<const char*>(message->transport_data()->buffer()) + | 521 static_cast<const char*>(message->transport_data()->buffer()) + |
449 transport_data_buffer_size); | 522 transport_data_buffer_size); |
450 } | |
451 } | 523 } |
452 | 524 |
453 for (size_t i = 0; i < dispatchers.size(); ++i) | 525 for (size_t i = 0; i < dispatchers.size(); ++i) |
454 dispatchers[i]->TransportEnded(); | 526 dispatchers[i]->TransportEnded(); |
455 } | 527 } |
456 | |
457 serialized_ = true; | |
458 } | 528 } |
459 | 529 |
460 scoped_refptr<Dispatcher> | 530 scoped_refptr<Dispatcher> |
461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 531 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
462 lock().AssertAcquired(); | 532 lock().AssertAcquired(); |
463 | 533 |
464 SerializeInternal(); | 534 SerializeInternal(); |
465 | 535 |
466 // TODO(vtl): Currently, there are no options, so we just use | 536 scoped_refptr<MessagePipeDispatcher> rv( |
467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options | 537 new MessagePipeDispatcher(transferable_)); |
468 // too. | |
469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); | |
470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); | |
471 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
472 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
473 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
474 serialized_fds_.swap(rv->serialized_fds_); | |
475 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | |
476 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | |
477 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | |
478 rv->serialized_ = true; | 538 rv->serialized_ = true; |
479 rv->write_error_ = write_error_; | 539 if (transferable_) { |
480 return scoped_refptr<Dispatcher>(rv.get()); | 540 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
| 541 serialized_message_queue_.swap(rv->serialized_message_queue_); |
| 542 serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| 543 serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
| 544 serialized_fds_.swap(rv->serialized_fds_); |
| 545 rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
| 546 rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
| 547 rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
| 548 rv->write_error_ = write_error_; |
| 549 } else { |
| 550 rv->pipe_id_ = pipe_id_; |
| 551 rv->non_transferable_state_ = non_transferable_state_; |
| 552 } |
| 553 return rv; |
481 } | 554 } |
482 | 555 |
483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | 556 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
484 const void* bytes, | 557 const void* bytes, |
485 uint32_t num_bytes, | 558 uint32_t num_bytes, |
486 std::vector<DispatcherTransport>* transports, | 559 std::vector<DispatcherTransport>* transports, |
487 MojoWriteMessageFlags flags) { | 560 MojoWriteMessageFlags flags) { |
| 561 lock().AssertAcquired(); |
488 | 562 |
489 DCHECK(!transports || | 563 DCHECK(!transports || |
490 (transports->size() > 0 && | 564 (transports->size() > 0 && |
491 transports->size() <= GetConfiguration().max_message_num_handles)); | 565 transports->size() <= GetConfiguration().max_message_num_handles)); |
492 | 566 |
493 lock().AssertAcquired(); | 567 if (write_error_ || |
494 | 568 (transferable_ && !channel_) || |
495 if (!channel_ || write_error_) | 569 (!transferable_ && non_transferable_state_ == CLOSED)) { |
496 return MOJO_RESULT_FAILED_PRECONDITION; | 570 return MOJO_RESULT_FAILED_PRECONDITION; |
| 571 } |
497 | 572 |
498 if (num_bytes > GetConfiguration().max_message_num_bytes) | 573 if (num_bytes > GetConfiguration().max_message_num_bytes) |
499 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 574 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
500 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 575 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); | 576 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
502 if (transports) { | 577 if (transports) { |
503 MojoResult result = AttachTransportsNoLock(message.get(), transports); | 578 MojoResult result = AttachTransportsNoLock(message.get(), transports); |
504 if (result != MOJO_RESULT_OK) | 579 if (result != MOJO_RESULT_OK) |
505 return result; | 580 return result; |
506 } | 581 } |
507 | 582 |
508 message->SerializeAndCloseDispatchers(); | 583 message->SerializeAndCloseDispatchers(); |
509 channel_->WriteMessage(message.Pass()); | 584 if (!transferable_) |
| 585 message->set_route_id(pipe_id_); |
| 586 if (!transferable_ && |
| 587 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
| 588 non_transferable_state_ == CONNECT_CALLED)) { |
| 589 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 590 RequestNontransferableChannel(); |
| 591 non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); |
| 592 } else { |
| 593 channel_->WriteMessage(message.Pass()); |
| 594 } |
510 | 595 |
511 return MOJO_RESULT_OK; | 596 return MOJO_RESULT_OK; |
512 } | 597 } |
513 | 598 |
514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | 599 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
515 void* bytes, | 600 void* bytes, |
516 uint32_t* num_bytes, | 601 uint32_t* num_bytes, |
517 DispatcherVector* dispatchers, | 602 DispatcherVector* dispatchers, |
518 uint32_t* num_dispatchers, | 603 uint32_t* num_dispatchers, |
519 MojoReadMessageFlags flags) { | 604 MojoReadMessageFlags flags) { |
520 lock().AssertAcquired(); | 605 lock().AssertAcquired(); |
521 if (channel_) | 606 if (channel_) { |
522 channel_->EnsureLazyInitialized(); | 607 channel_->EnsureLazyInitialized(); |
| 608 } else if (!transferable_) { |
| 609 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| 610 RequestNontransferableChannel(); |
| 611 return MOJO_RESULT_SHOULD_WAIT; |
| 612 } else if (non_transferable_state_ == CONNECT_CALLED) { |
| 613 return MOJO_RESULT_SHOULD_WAIT; |
| 614 } |
| 615 } |
| 616 |
523 DCHECK(!dispatchers || dispatchers->empty()); | 617 DCHECK(!dispatchers || dispatchers->empty()); |
524 | 618 |
525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; | 619 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; | 620 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
527 | 621 |
528 if (message_queue_.IsEmpty()) | 622 if (message_queue_.IsEmpty()) |
529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; | 623 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; |
530 | 624 |
531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | 625 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
532 // and release the lock immediately. | 626 // and release the lock immediately. |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
576 return MOJO_RESULT_OK; | 670 return MOJO_RESULT_OK; |
577 } | 671 } |
578 | 672 |
579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | 673 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
580 const { | 674 const { |
581 lock().AssertAcquired(); | 675 lock().AssertAcquired(); |
582 | 676 |
583 HandleSignalsState rv; | 677 HandleSignalsState rv; |
584 if (!message_queue_.IsEmpty()) | 678 if (!message_queue_.IsEmpty()) |
585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 679 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
586 if (channel_ || !message_queue_.IsEmpty()) | 680 if (!message_queue_.IsEmpty() || |
| 681 (transferable_ && channel_) || |
| 682 (!transferable_ && non_transferable_state_ != CLOSED)) |
587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 683 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
588 if (channel_ && !write_error_) { | 684 if (!write_error_ && |
| 685 ((transferable_ && channel_) || |
| 686 (!transferable_ && non_transferable_state_ != CLOSED))) { |
589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 687 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 688 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
591 } | 689 } |
592 if (!channel_ || write_error_) | 690 if (write_error_ || |
| 691 (transferable_ && !channel_) || |
| 692 (!transferable_ && non_transferable_state_ == CLOSED)) { |
593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 693 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 694 } |
594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 695 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
595 return rv; | 696 return rv; |
596 } | 697 } |
597 | 698 |
598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( | 699 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
599 Awakable* awakable, | 700 Awakable* awakable, |
600 MojoHandleSignals signals, | 701 MojoHandleSignals signals, |
601 uintptr_t context, | 702 uintptr_t context, |
602 HandleSignalsState* signals_state) { | 703 HandleSignalsState* signals_state) { |
603 lock().AssertAcquired(); | 704 lock().AssertAcquired(); |
604 if (channel_) | 705 if (channel_) { |
605 channel_->EnsureLazyInitialized(); | 706 channel_->EnsureLazyInitialized(); |
| 707 } else if (!transferable_ && |
| 708 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| 709 RequestNontransferableChannel(); |
| 710 } |
| 711 |
606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 712 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
607 if (state.satisfies(signals)) { | 713 if (state.satisfies(signals)) { |
608 if (signals_state) | 714 if (signals_state) |
609 *signals_state = state; | 715 *signals_state = state; |
610 return MOJO_RESULT_ALREADY_EXISTS; | 716 return MOJO_RESULT_ALREADY_EXISTS; |
611 } | 717 } |
612 if (!state.can_satisfy(signals)) { | 718 if (!state.can_satisfy(signals)) { |
613 if (signals_state) | 719 if (signals_state) |
614 *signals_state = state; | 720 *signals_state = state; |
615 return MOJO_RESULT_FAILED_PRECONDITION; | 721 return MOJO_RESULT_FAILED_PRECONDITION; |
(...skipping 30 matching lines...) Expand all Loading... |
646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); | 752 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); |
647 } | 753 } |
648 | 754 |
649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | 755 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
650 void* destination, | 756 void* destination, |
651 size_t* actual_size, | 757 size_t* actual_size, |
652 PlatformHandleVector* platform_handles) { | 758 PlatformHandleVector* platform_handles) { |
653 CloseImplNoLock(); | 759 CloseImplNoLock(); |
654 SerializedMessagePipeHandleDispatcher* serialization = | 760 SerializedMessagePipeHandleDispatcher* serialization = |
655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); | 761 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
| 762 serialization->transferable = transferable_; |
| 763 serialization->pipe_id = pipe_id_; |
656 if (serialized_platform_handle_.is_valid()) { | 764 if (serialized_platform_handle_.is_valid()) { |
657 serialization->platform_handle_index = platform_handles->size(); | 765 serialization->platform_handle_index = platform_handles->size(); |
658 platform_handles->push_back(serialized_platform_handle_.release()); | 766 platform_handles->push_back(serialized_platform_handle_.release()); |
659 } else { | 767 } else { |
660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; | 768 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
661 } | 769 } |
662 | 770 |
663 serialization->write_error = write_error_; | 771 serialization->write_error = write_error_; |
664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); | 772 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); |
665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); | 773 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); |
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
789 break; | 897 break; |
790 } | 898 } |
791 | 899 |
792 if (started_transport_.Try()) { | 900 if (started_transport_.Try()) { |
793 base::AutoLock locker(lock()); | 901 base::AutoLock locker(lock()); |
794 // We can get two OnError callbacks before the post task below completes. | 902 // We can get two OnError callbacks before the post task below completes. |
795 // Although RawChannel still has a pointer to this object until Shutdown is | 903 // Although RawChannel still has a pointer to this object until Shutdown is |
796 // called, that is safe since this class always does a PostTask to the IO | 904 // called, that is safe since this class always does a PostTask to the IO |
797 // thread to self destruct. | 905 // thread to self destruct. |
798 if (channel_ && error != ERROR_WRITE) { | 906 if (channel_ && error != ERROR_WRITE) { |
799 channel_->Shutdown(); | 907 if (transferable_) { |
| 908 channel_->Shutdown(); |
| 909 } else { |
| 910 CHECK_NE(non_transferable_state_, CLOSED); |
| 911 // Since we're in a callback from the Broker, call it asynchronously. |
| 912 internal::g_io_thread_task_runner->PostTask( |
| 913 FROM_HERE, |
| 914 base::Bind(&Broker::CloseMessagePipe, |
| 915 base::Unretained(internal::g_broker), pipe_id_, |
| 916 base::Unretained(this))); |
| 917 non_transferable_state_ = CLOSED; |
| 918 } |
800 channel_ = nullptr; | 919 channel_ = nullptr; |
801 } | 920 } |
802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 921 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
803 started_transport_.Release(); | 922 started_transport_.Release(); |
804 } else { | 923 } else { |
805 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 924 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
806 } | 925 } |
807 } | 926 } |
808 | 927 |
809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 928 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
810 MessageInTransit* message, | 929 MessageInTransit* message, |
811 std::vector<DispatcherTransport>* transports) { | 930 std::vector<DispatcherTransport>* transports) { |
812 DCHECK(!message->has_dispatchers()); | 931 DCHECK(!message->has_dispatchers()); |
813 | 932 |
814 // You're not allowed to send either handle to a message pipe over the message | 933 // You're not allowed to send either handle to a message pipe over the message |
815 // pipe, so check for this. (The case of trying to write a handle to itself is | 934 // pipe, so check for this. (The case of trying to write a handle to itself is |
816 // taken care of by |Core|. That case kind of makes sense, but leads to | 935 // taken care of by |Core|. That case kind of makes sense, but leads to |
817 // complications if, e.g., both sides try to do the same thing with their | 936 // complications if, e.g., both sides try to do the same thing with their |
818 // respective handles simultaneously. The other case, of trying to write the | 937 // respective handles simultaneously. The other case, of trying to write the |
819 // peer handle to a handle, doesn't make sense -- since no handle will be | 938 // peer handle to a handle, doesn't make sense -- since no handle will be |
820 // available to read the message from.) | 939 // available to read the message from.) |
821 for (size_t i = 0; i < transports->size(); i++) { | 940 for (size_t i = 0; i < transports->size(); i++) { |
822 if (!(*transports)[i].is_valid()) | 941 if (!(*transports)[i].is_valid()) |
823 continue; | 942 continue; |
824 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { | 943 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { |
825 MessagePipeDispatcher* mp = | 944 MessagePipeDispatcher* mp = |
826 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); | 945 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); |
827 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { | 946 if (transferable_ && mp->transferable_ && |
| 947 channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { |
828 // The other case should have been disallowed by |Core|. (Note: |port| | 948 // The other case should have been disallowed by |Core|. (Note: |port| |
829 // is the peer port of the handle given to |WriteMessage()|.) | 949 // is the peer port of the handle given to |WriteMessage()|.) |
830 return MOJO_RESULT_INVALID_ARGUMENT; | 950 return MOJO_RESULT_INVALID_ARGUMENT; |
| 951 } else if (!transferable_ && !mp->transferable_ && |
| 952 pipe_id_ == mp->pipe_id_) { |
| 953 return MOJO_RESULT_INVALID_ARGUMENT; |
831 } | 954 } |
832 } | 955 } |
833 } | 956 } |
834 | 957 |
835 // Clone the dispatchers and attach them to the message. (This must be done as | 958 // Clone the dispatchers and attach them to the message. (This must be done as |
836 // a separate loop, since we want to leave the dispatchers alone on failure.) | 959 // a separate loop, since we want to leave the dispatchers alone on failure.) |
837 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | 960 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
838 dispatchers->reserve(transports->size()); | 961 dispatchers->reserve(transports->size()); |
839 for (size_t i = 0; i < transports->size(); i++) { | 962 for (size_t i = 0; i < transports->size(); i++) { |
840 if ((*transports)[i].is_valid()) { | 963 if ((*transports)[i].is_valid()) { |
841 dispatchers->push_back( | 964 dispatchers->push_back( |
842 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 965 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
843 } else { | 966 } else { |
844 LOG(WARNING) << "Enqueueing null dispatcher"; | 967 LOG(WARNING) << "Enqueueing null dispatcher"; |
845 dispatchers->push_back(nullptr); | 968 dispatchers->push_back(nullptr); |
846 } | 969 } |
847 } | 970 } |
848 message->SetDispatchers(dispatchers.Pass()); | 971 message->SetDispatchers(dispatchers.Pass()); |
849 return MOJO_RESULT_OK; | 972 return MOJO_RESULT_OK; |
850 } | 973 } |
851 | 974 |
| 975 void MessagePipeDispatcher::RequestNontransferableChannel() { |
| 976 lock().AssertAcquired(); |
| 977 CHECK(!transferable_); |
| 978 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
| 979 non_transferable_state_ = CONNECT_CALLED; |
| 980 |
| 981 // PostTask since the broker can call us back synchronously. |
| 982 internal::g_io_thread_task_runner->PostTask( |
| 983 FROM_HERE, |
| 984 base::Bind(&Broker::ConnectMessagePipe, |
| 985 base::Unretained(internal::g_broker), pipe_id_, |
| 986 base::Unretained(this))); |
| 987 } |
| 988 |
852 } // namespace edk | 989 } // namespace edk |
853 } // namespace mojo | 990 } // namespace mojo |
OLD | NEW |