OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/logging.h" | 8 #include "base/logging.h" |
9 #include "base/message_loop/message_loop.h" | 9 #include "base/message_loop/message_loop.h" |
10 #include "mojo/edk/embedder/embedder_internal.h" | 10 #include "mojo/edk/embedder/embedder_internal.h" |
11 #include "mojo/edk/embedder/platform_handle_utils.h" | 11 #include "mojo/edk/embedder/platform_handle_utils.h" |
12 #include "mojo/edk/embedder/platform_shared_buffer.h" | 12 #include "mojo/edk/embedder/platform_shared_buffer.h" |
13 #include "mojo/edk/embedder/platform_support.h" | 13 #include "mojo/edk/embedder/platform_support.h" |
14 #include "mojo/edk/system/broker.h" | 14 #include "mojo/edk/system/broker.h" |
15 #include "mojo/edk/system/configuration.h" | 15 #include "mojo/edk/system/configuration.h" |
16 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
17 #include "mojo/edk/system/options_validation.h" | 17 #include "mojo/edk/system/options_validation.h" |
18 #include "mojo/edk/system/transport_data.h" | 18 #include "mojo/edk/system/transport_data.h" |
19 | 19 |
20 namespace mojo { | 20 namespace mojo { |
21 namespace edk { | 21 namespace edk { |
22 | 22 |
23 namespace { | 23 namespace { |
24 | 24 |
25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); | 25 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
26 | 26 |
27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { | 27 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
28 bool transferable; | |
29 bool write_error; | |
30 uint64_t pipe_id; // If transferable is false. | |
31 // The following members are only set if transferable is true. | |
28 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP | 32 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
29 // was closed. | 33 // was closed. |
30 size_t platform_handle_index; | 34 size_t platform_handle_index; |
31 bool write_error; | |
32 | 35 |
33 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) | 36 size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
34 uint32_t shared_memory_size; | 37 uint32_t shared_memory_size; |
35 | 38 |
36 size_t serialized_read_buffer_size; | 39 size_t serialized_read_buffer_size; |
37 size_t serialized_write_buffer_size; | 40 size_t serialized_write_buffer_size; |
38 size_t serialized_message_queue_size; | 41 size_t serialized_message_queue_size; |
39 | 42 |
40 // These are the FDs required as part of serializing channel_ and | 43 // These are the FDs required as part of serializing channel_ and |
41 // message_queue_. This is only used on POSIX. | 44 // message_queue_. This is only used on POSIX. |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
83 | 86 |
84 const MojoCreateMessagePipeOptions | 87 const MojoCreateMessagePipeOptions |
85 MessagePipeDispatcher::kDefaultCreateOptions = { | 88 MessagePipeDispatcher::kDefaultCreateOptions = { |
86 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | 89 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
87 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | 90 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
88 | 91 |
89 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | 92 MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
90 const MojoCreateMessagePipeOptions* in_options, | 93 const MojoCreateMessagePipeOptions* in_options, |
91 MojoCreateMessagePipeOptions* out_options) { | 94 MojoCreateMessagePipeOptions* out_options) { |
92 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | 95 const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
93 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; | 96 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
97 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; | |
94 | 98 |
95 *out_options = kDefaultCreateOptions; | 99 *out_options = kDefaultCreateOptions; |
96 if (!in_options) | 100 if (!in_options) |
97 return MOJO_RESULT_OK; | 101 return MOJO_RESULT_OK; |
98 | 102 |
99 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | 103 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
100 if (!reader.is_valid()) | 104 if (!reader.is_valid()) |
101 return MOJO_RESULT_INVALID_ARGUMENT; | 105 return MOJO_RESULT_INVALID_ARGUMENT; |
102 | 106 |
103 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | 107 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
104 return MOJO_RESULT_OK; | 108 return MOJO_RESULT_OK; |
105 if ((reader.options().flags & ~kKnownFlags)) | 109 if ((reader.options().flags & ~kKnownFlags)) |
106 return MOJO_RESULT_UNIMPLEMENTED; | 110 return MOJO_RESULT_UNIMPLEMENTED; |
107 out_options->flags = reader.options().flags; | 111 out_options->flags = reader.options().flags; |
108 | 112 |
109 // Checks for fields beyond |flags|: | 113 // Checks for fields beyond |flags|: |
110 | 114 |
111 // (Nothing here yet.) | 115 // (Nothing here yet.) |
112 | 116 |
113 return MOJO_RESULT_OK; | 117 return MOJO_RESULT_OK; |
114 } | 118 } |
115 | 119 |
116 void MessagePipeDispatcher::Init( | 120 void MessagePipeDispatcher::Init( |
117 ScopedPlatformHandle message_pipe, | 121 ScopedPlatformHandle message_pipe, |
118 char* serialized_read_buffer, size_t serialized_read_buffer_size, | 122 char* serialized_read_buffer, size_t serialized_read_buffer_size, |
119 char* serialized_write_buffer, size_t serialized_write_buffer_size, | 123 char* serialized_write_buffer, size_t serialized_write_buffer_size, |
120 std::vector<int>* serialized_read_fds, | 124 std::vector<int>* serialized_read_fds, |
121 std::vector<int>* serialized_write_fds) { | 125 std::vector<int>* serialized_write_fds) { |
126 CHECK(transferable_); | |
122 if (message_pipe.get().is_valid()) { | 127 if (message_pipe.get().is_valid()) { |
123 channel_ = RawChannel::Create(message_pipe.Pass()); | 128 channel_ = RawChannel::Create(message_pipe.Pass()); |
124 | 129 |
125 // TODO(jam): It's probably cleaner to pass this in Init call. | 130 // TODO(jam): It's probably cleaner to pass this in Init call. |
126 channel_->SetSerializedData( | 131 channel_->SetSerializedData( |
127 serialized_read_buffer, serialized_read_buffer_size, | 132 serialized_read_buffer, serialized_read_buffer_size, |
128 serialized_write_buffer, serialized_write_buffer_size, | 133 serialized_write_buffer, serialized_write_buffer_size, |
129 serialized_read_fds, serialized_write_fds); | 134 serialized_read_fds, serialized_write_fds); |
130 internal::g_io_thread_task_runner->PostTask( | 135 internal::g_io_thread_task_runner->PostTask( |
131 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | 136 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
132 } | 137 } |
133 } | 138 } |
134 | 139 |
140 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { | |
141 CHECK(!transferable_); | |
142 pipe_id_ = pipe_id; | |
143 } | |
144 | |
135 void MessagePipeDispatcher::InitOnIO() { | 145 void MessagePipeDispatcher::InitOnIO() { |
136 base::AutoLock locker(lock()); | 146 base::AutoLock locker(lock()); |
147 CHECK(transferable_); | |
137 calling_init_ = true; | 148 calling_init_ = true; |
138 if (channel_) | 149 if (channel_) |
139 channel_->Init(this); | 150 channel_->Init(this); |
140 calling_init_ = false; | 151 calling_init_ = false; |
141 } | 152 } |
142 | 153 |
143 void MessagePipeDispatcher::CloseOnIO() { | 154 void MessagePipeDispatcher::CloseOnIO() { |
144 base::AutoLock locker(lock()); | 155 base::AutoLock locker(lock()); |
145 | 156 if (transferable_) { |
146 if (channel_) { | 157 if (channel_) { |
147 channel_->Shutdown(); | 158 channel_->Shutdown(); |
148 channel_ = nullptr; | 159 channel_ = nullptr; |
160 } | |
161 } else { | |
162 if (non_transferable_state_ == CONNECT_CALLED) { | |
163 // We can't cancel the pending request yet, since the other side of the | |
164 // message pipe would want to get pending outgoing messages (if any) or | |
165 // at least know that this end was closed. So keep this object alive until | |
166 // then. | |
167 AddRef(); | |
168 } else if (non_transferable_state_ == CONNECTED) { | |
169 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
170 non_transferable_state_ = CLOSED; | |
171 channel_ = nullptr; | |
172 } | |
149 } | 173 } |
150 } | 174 } |
151 | 175 |
152 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 176 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
153 return Type::MESSAGE_PIPE; | 177 return Type::MESSAGE_PIPE; |
154 } | 178 } |
155 | 179 |
180 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { | |
181 base::AutoLock locker(lock()); | |
182 channel_ = channel; | |
183 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { | |
184 channel_->WriteMessage( | |
185 non_transferable_outgoing_message_queue_.GetMessage()); | |
186 } | |
187 | |
188 if (is_closed()) { | |
189 CHECK_EQ(non_transferable_state_, CONNECT_CALLED); | |
190 // We kept this object alive until it's connected, we can release it now. | |
191 // Since we're in a callback from the Broker, call it asynchronously. | |
192 internal::g_io_thread_task_runner->PostTask( | |
193 FROM_HERE, | |
194 base::Bind(&Broker::CloseMessagePipe, | |
195 base::Unretained(internal::g_broker), pipe_id_, | |
196 base::Unretained(this))); | |
197 non_transferable_state_ = CLOSED; | |
198 channel_ = nullptr; | |
199 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | |
200 } else { | |
201 non_transferable_state_ = CONNECTED; | |
202 } | |
203 } | |
204 | |
156 #if defined(OS_WIN) | 205 #if defined(OS_WIN) |
157 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 206 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
158 // best way we want to share this. | 207 // best way we want to share this. |
159 // Since this is used for serialization of messages read/written to a MP that | 208 // Since this is used for serialization of messages read/written to a MP that |
160 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 209 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
161 // them when a MP is being sent. As a result, even for POSIX we will probably | 210 // them when a MP is being sent. As a result, even for POSIX we will probably |
162 // want to send the handles to the shell process and exchange them for tokens | 211 // want to send the handles to the shell process and exchange them for tokens |
163 // (since we can be sure that the shell will respond to our IPCs, compared to | 212 // (since we can be sure that the shell will respond to our IPCs, compared to |
164 // the other end where we're sending the MP to, which may not be reading...). | 213 // the other end where we're sending the MP to, which may not be reading...). |
165 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | 214 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
(...skipping 13 matching lines...) Expand all Loading... | |
179 const void* source, | 228 const void* source, |
180 size_t size, | 229 size_t size, |
181 PlatformHandleVector* platform_handles) { | 230 PlatformHandleVector* platform_handles) { |
182 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { | 231 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { |
183 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; | 232 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; |
184 return nullptr; | 233 return nullptr; |
185 } | 234 } |
186 | 235 |
187 const SerializedMessagePipeHandleDispatcher* serialization = | 236 const SerializedMessagePipeHandleDispatcher* serialization = |
188 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); | 237 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
238 | |
239 scoped_refptr<MessagePipeDispatcher> rv( | |
240 new MessagePipeDispatcher(serialization->transferable)); | |
241 if (!rv->transferable_) { | |
242 rv->InitNonTransferable(serialization->pipe_id); | |
243 return rv; | |
244 } | |
245 | |
189 if (serialization->shared_memory_size != | 246 if (serialization->shared_memory_size != |
190 (serialization->serialized_read_buffer_size + | 247 (serialization->serialized_read_buffer_size + |
191 serialization->serialized_write_buffer_size + | 248 serialization->serialized_write_buffer_size + |
192 serialization->serialized_message_queue_size)) { | 249 serialization->serialized_message_queue_size)) { |
193 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; | 250 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; |
194 return nullptr; | 251 return nullptr; |
195 } | 252 } |
196 | 253 |
197 ScopedPlatformHandle platform_handle, shared_memory_handle; | 254 ScopedPlatformHandle platform_handle, shared_memory_handle; |
198 if (!GetHandle(serialization->platform_handle_index, | 255 if (!GetHandle(serialization->platform_handle_index, |
(...skipping 27 matching lines...) Expand all Loading... | |
226 serialization->serialized_write_buffer_size; | 283 serialization->serialized_write_buffer_size; |
227 buffer += serialized_write_buffer_size; | 284 buffer += serialized_write_buffer_size; |
228 } | 285 } |
229 if (serialization->serialized_message_queue_size) { | 286 if (serialization->serialized_message_queue_size) { |
230 message_queue_data = buffer; | 287 message_queue_data = buffer; |
231 message_queue_size = serialization->serialized_message_queue_size; | 288 message_queue_size = serialization->serialized_message_queue_size; |
232 buffer += message_queue_size; | 289 buffer += message_queue_size; |
233 } | 290 } |
234 } | 291 } |
235 | 292 |
236 scoped_refptr<MessagePipeDispatcher> rv( | |
237 Create(MessagePipeDispatcher::kDefaultCreateOptions)); | |
238 rv->write_error_ = serialization->write_error; | 293 rv->write_error_ = serialization->write_error; |
239 | 294 |
240 std::vector<int> serialized_read_fds; | 295 std::vector<int> serialized_read_fds; |
241 std::vector<int> serialized_write_fds; | 296 std::vector<int> serialized_write_fds; |
242 #if defined(OS_POSIX) | 297 #if defined(OS_POSIX) |
243 std::vector<int> serialized_fds; | 298 std::vector<int> serialized_fds; |
244 size_t serialized_fds_index = 0; | 299 size_t serialized_fds_index = 0; |
245 | 300 |
246 size_t total_fd_count = serialization->serialized_read_fds_length + | 301 size_t total_fd_count = serialization->serialized_read_fds_length + |
247 serialization->serialized_write_fds_length + | 302 serialization->serialized_write_fds_length + |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
326 &serialized_write_fds); | 381 &serialized_write_fds); |
327 | 382 |
328 if (message_queue_size) { // Should be empty by now. | 383 if (message_queue_size) { // Should be empty by now. |
329 LOG(ERROR) << "Invalid queued messages"; | 384 LOG(ERROR) << "Invalid queued messages"; |
330 return nullptr; | 385 return nullptr; |
331 } | 386 } |
332 | 387 |
333 return rv; | 388 return rv; |
334 } | 389 } |
335 | 390 |
336 MessagePipeDispatcher::MessagePipeDispatcher() | 391 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
337 : channel_(nullptr), | 392 : channel_(nullptr), |
338 serialized_(false), | |
339 serialized_read_fds_length_(0u), | 393 serialized_read_fds_length_(0u), |
340 serialized_write_fds_length_(0u), | 394 serialized_write_fds_length_(0u), |
341 serialized_message_fds_length_(0u), | 395 serialized_message_fds_length_(0u), |
396 pipe_id_(0), | |
397 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), | |
398 serialized_(false), | |
342 calling_init_(false), | 399 calling_init_(false), |
343 write_error_(false) { | 400 write_error_(false), |
401 transferable_(transferable) { | |
344 } | 402 } |
345 | 403 |
346 MessagePipeDispatcher::~MessagePipeDispatcher() { | 404 MessagePipeDispatcher::~MessagePipeDispatcher() { |
347 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 405 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
348 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 406 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
349 // down and so when it was deleting pending tasks it caused the last reference | 407 // down and so when it was deleting pending tasks it caused the last reference |
350 // to destruct this object. In that case, safe to destroy the channel. | 408 // to destruct this object. In that case, safe to destroy the channel. |
351 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | 409 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
352 channel_->Shutdown(); | 410 channel_->Shutdown(); |
353 else | 411 else |
354 DCHECK(!channel_); | 412 DCHECK(!channel_); |
355 #if defined(OS_POSIX) | 413 #if defined(OS_POSIX) |
356 ClosePlatformHandles(&serialized_fds_); | 414 ClosePlatformHandles(&serialized_fds_); |
357 #endif | 415 #endif |
358 } | 416 } |
359 | 417 |
360 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { | 418 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
361 lock().AssertAcquired(); | 419 lock().AssertAcquired(); |
362 awakable_list_.CancelAll(); | 420 awakable_list_.CancelAll(); |
363 } | 421 } |
364 | 422 |
365 void MessagePipeDispatcher::CloseImplNoLock() { | 423 void MessagePipeDispatcher::CloseImplNoLock() { |
366 lock().AssertAcquired(); | 424 lock().AssertAcquired(); |
367 internal::g_io_thread_task_runner->PostTask( | 425 internal::g_io_thread_task_runner->PostTask( |
368 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 426 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
369 } | 427 } |
370 | 428 |
371 void MessagePipeDispatcher::SerializeInternal() { | 429 void MessagePipeDispatcher::SerializeInternal() { |
430 serialized_ = true; | |
431 if (!transferable_) { | |
432 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
433 << "Non transferable message pipe being sent after read/write. " | |
434 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " | |
435 << "the pipe can be sent after it's read or written."; | |
436 return; | |
437 } | |
438 | |
372 // We need to stop watching handle immediately, even though not on IO thread, | 439 // We need to stop watching handle immediately, even though not on IO thread, |
373 // so that other messages aren't read after this. | 440 // so that other messages aren't read after this. |
374 std::vector<int> serialized_read_fds, serialized_write_fds; | 441 std::vector<int> serialized_read_fds, serialized_write_fds; |
375 if (channel_) { | 442 if (channel_) { |
376 bool write_error = false; | 443 bool write_error = false; |
377 | 444 |
378 serialized_platform_handle_ = channel_->ReleaseHandle( | 445 serialized_platform_handle_ = channel_->ReleaseHandle( |
379 &serialized_read_buffer_, &serialized_write_buffer_, | 446 &serialized_read_buffer_, &serialized_write_buffer_, |
380 &serialized_read_fds, &serialized_write_fds, &write_error); | 447 &serialized_read_fds, &serialized_write_fds, &write_error); |
381 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), | 448 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), |
382 serialized_read_fds.end()); | 449 serialized_read_fds.end()); |
383 serialized_read_fds_length_ = serialized_read_fds.size(); | 450 serialized_read_fds_length_ = serialized_read_fds.size(); |
384 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), | 451 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), |
385 serialized_write_fds.end()); | 452 serialized_write_fds.end()); |
386 serialized_write_fds_length_ = serialized_write_fds.size(); | 453 serialized_write_fds_length_ = serialized_write_fds.size(); |
387 channel_ = nullptr; | 454 channel_ = nullptr; |
388 if (write_error) | |
389 write_error = true; | |
390 } else { | 455 } else { |
391 // It's valid that the other side wrote some data and closed its end. | 456 // It's valid that the other side wrote some data and closed its end. |
392 } | 457 } |
393 | 458 |
394 DCHECK(serialized_message_queue_.empty()); | 459 DCHECK(serialized_message_queue_.empty()); |
395 while (!message_queue_.IsEmpty()) { | 460 while (!message_queue_.IsEmpty()) { |
396 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); | 461 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
397 | 462 |
398 // When MojoWriteMessage is called, the MessageInTransit doesn't have | 463 // When MojoWriteMessage is called, the MessageInTransit doesn't have |
399 // dispatchers set and CreateEquivaent... is called since the dispatchers | 464 // dispatchers set and CreateEquivaent... is called since the dispatchers |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
446 serialized_message_queue_.end(), | 511 serialized_message_queue_.end(), |
447 static_cast<const char*>(message->transport_data()->buffer()), | 512 static_cast<const char*>(message->transport_data()->buffer()), |
448 static_cast<const char*>(message->transport_data()->buffer()) + | 513 static_cast<const char*>(message->transport_data()->buffer()) + |
449 transport_data_buffer_size); | 514 transport_data_buffer_size); |
450 } | 515 } |
451 } | 516 } |
452 | 517 |
453 for (size_t i = 0; i < dispatchers.size(); ++i) | 518 for (size_t i = 0; i < dispatchers.size(); ++i) |
454 dispatchers[i]->TransportEnded(); | 519 dispatchers[i]->TransportEnded(); |
455 } | 520 } |
456 | |
457 serialized_ = true; | |
458 } | 521 } |
459 | 522 |
460 scoped_refptr<Dispatcher> | 523 scoped_refptr<Dispatcher> |
461 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 524 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
462 lock().AssertAcquired(); | 525 lock().AssertAcquired(); |
463 | 526 |
464 SerializeInternal(); | 527 SerializeInternal(); |
465 | 528 |
466 // TODO(vtl): Currently, there are no options, so we just use | 529 // TODO(vtl): Currently, there are no options, so we just use |
yzshen1
2015/12/03 23:37:50
I think this comment is not useful anymore.
jam
2015/12/04 05:06:47
Done.
| |
467 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options | 530 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options |
468 // too. | 531 // too. |
469 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); | 532 scoped_refptr<MessagePipeDispatcher> rv( |
470 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); | 533 new MessagePipeDispatcher(transferable_)); |
471 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
472 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
473 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
474 serialized_fds_.swap(rv->serialized_fds_); | |
475 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | |
476 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | |
477 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | |
478 rv->serialized_ = true; | 534 rv->serialized_ = true; |
479 rv->write_error_ = write_error_; | 535 if (transferable_) { |
480 return scoped_refptr<Dispatcher>(rv.get()); | 536 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
537 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
538 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
539 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
540 serialized_fds_.swap(rv->serialized_fds_); | |
541 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | |
542 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | |
543 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | |
544 rv->write_error_ = write_error_; | |
545 } else { | |
546 rv->pipe_id_ = pipe_id_; | |
547 rv->non_transferable_state_ = non_transferable_state_; | |
548 } | |
549 return rv; | |
481 } | 550 } |
482 | 551 |
483 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | 552 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
484 const void* bytes, | 553 const void* bytes, |
485 uint32_t num_bytes, | 554 uint32_t num_bytes, |
486 std::vector<DispatcherTransport>* transports, | 555 std::vector<DispatcherTransport>* transports, |
487 MojoWriteMessageFlags flags) { | 556 MojoWriteMessageFlags flags) { |
557 lock().AssertAcquired(); | |
488 | 558 |
489 DCHECK(!transports || | 559 DCHECK(!transports || |
490 (transports->size() > 0 && | 560 (transports->size() > 0 && |
491 transports->size() <= GetConfiguration().max_message_num_handles)); | 561 transports->size() <= GetConfiguration().max_message_num_handles)); |
492 | 562 |
493 lock().AssertAcquired(); | 563 if (write_error_ || |
494 | 564 (transferable_ && !channel_) || |
495 if (!channel_ || write_error_) | 565 (!transferable_ && non_transferable_state_ == CLOSED)) { |
496 return MOJO_RESULT_FAILED_PRECONDITION; | 566 return MOJO_RESULT_FAILED_PRECONDITION; |
567 } | |
497 | 568 |
498 if (num_bytes > GetConfiguration().max_message_num_bytes) | 569 if (num_bytes > GetConfiguration().max_message_num_bytes) |
499 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 570 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
500 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 571 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
501 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); | 572 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
502 if (transports) { | 573 if (transports) { |
503 MojoResult result = AttachTransportsNoLock(message.get(), transports); | 574 MojoResult result = AttachTransportsNoLock(message.get(), transports); |
504 if (result != MOJO_RESULT_OK) | 575 if (result != MOJO_RESULT_OK) |
505 return result; | 576 return result; |
506 } | 577 } |
507 | 578 |
508 message->SerializeAndCloseDispatchers(); | 579 message->SerializeAndCloseDispatchers(); |
509 channel_->WriteMessage(message.Pass()); | 580 if (!transferable_) |
581 message->set_route_id(pipe_id_); | |
582 if (!transferable_ && | |
583 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || | |
584 non_transferable_state_ == CONNECT_CALLED)) { | |
585 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
586 RequestNontransferableChannel(); | |
587 non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); | |
588 } else { | |
589 channel_->WriteMessage(message.Pass()); | |
590 } | |
510 | 591 |
511 return MOJO_RESULT_OK; | 592 return MOJO_RESULT_OK; |
512 } | 593 } |
513 | 594 |
514 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | 595 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
515 void* bytes, | 596 void* bytes, |
516 uint32_t* num_bytes, | 597 uint32_t* num_bytes, |
517 DispatcherVector* dispatchers, | 598 DispatcherVector* dispatchers, |
518 uint32_t* num_dispatchers, | 599 uint32_t* num_dispatchers, |
519 MojoReadMessageFlags flags) { | 600 MojoReadMessageFlags flags) { |
520 lock().AssertAcquired(); | 601 lock().AssertAcquired(); |
521 if (channel_) | 602 if (channel_) { |
522 channel_->EnsureLazyInitialized(); | 603 channel_->EnsureLazyInitialized(); |
604 } else if (!transferable_ && | |
605 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
606 RequestNontransferableChannel(); | |
607 return MOJO_RESULT_SHOULD_WAIT; | |
608 } | |
609 | |
523 DCHECK(!dispatchers || dispatchers->empty()); | 610 DCHECK(!dispatchers || dispatchers->empty()); |
524 | 611 |
525 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; | 612 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
526 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; | 613 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
527 | 614 |
528 if (message_queue_.IsEmpty()) | 615 if (message_queue_.IsEmpty()) |
529 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; | 616 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; |
530 | 617 |
531 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | 618 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
532 // and release the lock immediately. | 619 // and release the lock immediately. |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
576 return MOJO_RESULT_OK; | 663 return MOJO_RESULT_OK; |
577 } | 664 } |
578 | 665 |
579 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | 666 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
580 const { | 667 const { |
581 lock().AssertAcquired(); | 668 lock().AssertAcquired(); |
582 | 669 |
583 HandleSignalsState rv; | 670 HandleSignalsState rv; |
584 if (!message_queue_.IsEmpty()) | 671 if (!message_queue_.IsEmpty()) |
585 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 672 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
586 if (channel_ || !message_queue_.IsEmpty()) | 673 if (!message_queue_.IsEmpty() || |
674 (transferable_ && channel_) || | |
675 (!transferable_ && non_transferable_state_ != CLOSED)) | |
587 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 676 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
588 if (channel_ && !write_error_) { | 677 if (!write_error_ && |
678 ((transferable_ && channel_) || | |
679 (!transferable_ && non_transferable_state_ != CLOSED))) { | |
589 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 680 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
590 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 681 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
591 } | 682 } |
592 if (!channel_ || write_error_) | 683 if (write_error_ || |
684 (transferable_ && !channel_) || | |
685 (!transferable_ && non_transferable_state_ == CLOSED)) { | |
593 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 686 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
687 } | |
594 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 688 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
595 return rv; | 689 return rv; |
596 } | 690 } |
597 | 691 |
598 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( | 692 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
599 Awakable* awakable, | 693 Awakable* awakable, |
600 MojoHandleSignals signals, | 694 MojoHandleSignals signals, |
601 uintptr_t context, | 695 uintptr_t context, |
602 HandleSignalsState* signals_state) { | 696 HandleSignalsState* signals_state) { |
603 lock().AssertAcquired(); | 697 lock().AssertAcquired(); |
604 if (channel_) | 698 if (channel_) { |
605 channel_->EnsureLazyInitialized(); | 699 channel_->EnsureLazyInitialized(); |
700 } else if (!transferable_ && | |
701 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
702 RequestNontransferableChannel(); | |
703 } | |
704 | |
606 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 705 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
607 if (state.satisfies(signals)) { | 706 if (state.satisfies(signals)) { |
608 if (signals_state) | 707 if (signals_state) |
609 *signals_state = state; | 708 *signals_state = state; |
610 return MOJO_RESULT_ALREADY_EXISTS; | 709 return MOJO_RESULT_ALREADY_EXISTS; |
611 } | 710 } |
612 if (!state.can_satisfy(signals)) { | 711 if (!state.can_satisfy(signals)) { |
613 if (signals_state) | 712 if (signals_state) |
614 *signals_state = state; | 713 *signals_state = state; |
615 return MOJO_RESULT_FAILED_PRECONDITION; | 714 return MOJO_RESULT_FAILED_PRECONDITION; |
(...skipping 30 matching lines...) Expand all Loading... | |
646 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); | 745 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); |
647 } | 746 } |
648 | 747 |
649 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | 748 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
650 void* destination, | 749 void* destination, |
651 size_t* actual_size, | 750 size_t* actual_size, |
652 PlatformHandleVector* platform_handles) { | 751 PlatformHandleVector* platform_handles) { |
653 CloseImplNoLock(); | 752 CloseImplNoLock(); |
654 SerializedMessagePipeHandleDispatcher* serialization = | 753 SerializedMessagePipeHandleDispatcher* serialization = |
655 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); | 754 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
755 serialization->transferable = transferable_; | |
756 serialization->pipe_id = pipe_id_; | |
656 if (serialized_platform_handle_.is_valid()) { | 757 if (serialized_platform_handle_.is_valid()) { |
657 serialization->platform_handle_index = platform_handles->size(); | 758 serialization->platform_handle_index = platform_handles->size(); |
658 platform_handles->push_back(serialized_platform_handle_.release()); | 759 platform_handles->push_back(serialized_platform_handle_.release()); |
659 } else { | 760 } else { |
660 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; | 761 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
661 } | 762 } |
662 | 763 |
663 serialization->write_error = write_error_; | 764 serialization->write_error = write_error_; |
664 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); | 765 serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); |
665 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); | 766 serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); |
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
789 break; | 890 break; |
790 } | 891 } |
791 | 892 |
792 if (started_transport_.Try()) { | 893 if (started_transport_.Try()) { |
793 base::AutoLock locker(lock()); | 894 base::AutoLock locker(lock()); |
794 // We can get two OnError callbacks before the post task below completes. | 895 // We can get two OnError callbacks before the post task below completes. |
795 // Although RawChannel still has a pointer to this object until Shutdown is | 896 // Although RawChannel still has a pointer to this object until Shutdown is |
796 // called, that is safe since this class always does a PostTask to the IO | 897 // called, that is safe since this class always does a PostTask to the IO |
797 // thread to self destruct. | 898 // thread to self destruct. |
798 if (channel_ && error != ERROR_WRITE) { | 899 if (channel_ && error != ERROR_WRITE) { |
799 channel_->Shutdown(); | 900 if (transferable_) { |
901 channel_->Shutdown(); | |
902 } else { | |
903 CHECK_NE(non_transferable_state_, CLOSED); | |
904 // Since we're in a callback from the Broker, call it asynchronously. | |
905 internal::g_io_thread_task_runner->PostTask( | |
906 FROM_HERE, | |
907 base::Bind(&Broker::CloseMessagePipe, | |
908 base::Unretained(internal::g_broker), pipe_id_, | |
909 base::Unretained(this))); | |
910 non_transferable_state_ = CLOSED; | |
911 } | |
800 channel_ = nullptr; | 912 channel_ = nullptr; |
801 } | 913 } |
802 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 914 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
803 started_transport_.Release(); | 915 started_transport_.Release(); |
804 } else { | 916 } else { |
805 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 917 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
806 } | 918 } |
807 } | 919 } |
808 | 920 |
809 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 921 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
842 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 954 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
843 } else { | 955 } else { |
844 LOG(WARNING) << "Enqueueing null dispatcher"; | 956 LOG(WARNING) << "Enqueueing null dispatcher"; |
845 dispatchers->push_back(nullptr); | 957 dispatchers->push_back(nullptr); |
846 } | 958 } |
847 } | 959 } |
848 message->SetDispatchers(dispatchers.Pass()); | 960 message->SetDispatchers(dispatchers.Pass()); |
849 return MOJO_RESULT_OK; | 961 return MOJO_RESULT_OK; |
850 } | 962 } |
851 | 963 |
964 void MessagePipeDispatcher::RequestNontransferableChannel() { | |
965 lock().AssertAcquired(); | |
966 CHECK(!transferable_); | |
967 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); | |
968 non_transferable_state_ = CONNECT_CALLED; | |
969 | |
970 // PostTask since the broker can call us back synchronously. | |
971 internal::g_io_thread_task_runner->PostTask( | |
972 FROM_HERE, | |
973 base::Bind(&Broker::ConnectMessagePipe, | |
974 base::Unretained(internal::g_broker), pipe_id_, | |
975 base::Unretained(this))); | |
976 } | |
977 | |
852 } // namespace edk | 978 } // namespace edk |
853 } // namespace mojo | 979 } // namespace mojo |
OLD | NEW |