OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | |
8 #include <stdint.h> | |
9 | |
10 #include <limits> | 7 #include <limits> |
11 #include <utility> | 8 |
12 #include <vector> | 9 #include "base/macros.h" |
13 | 10 #include "base/memory/ref_counted.h" |
14 #include "base/bind.h" | 11 #include "base/memory/scoped_ptr.h" |
15 #include "base/debug/stack_trace.h" | |
16 #include "base/logging.h" | |
17 #include "base/message_loop/message_loop.h" | |
18 #include "mojo/edk/embedder/embedder_internal.h" | 12 #include "mojo/edk/embedder/embedder_internal.h" |
19 #include "mojo/edk/embedder/platform_handle_utils.h" | 13 #include "mojo/edk/system/core.h" |
20 #include "mojo/edk/embedder/platform_shared_buffer.h" | 14 #include "mojo/edk/system/node_controller.h" |
21 #include "mojo/edk/embedder/platform_support.h" | 15 #include "mojo/edk/system/ports_message.h" |
22 #include "mojo/edk/system/broker.h" | 16 #include "mojo/public/c/system/macros.h" |
23 #include "mojo/edk/system/configuration.h" | |
24 #include "mojo/edk/system/message_in_transit.h" | |
25 #include "mojo/edk/system/options_validation.h" | |
26 #include "mojo/edk/system/transport_data.h" | |
27 | 17 |
28 namespace mojo { | 18 namespace mojo { |
29 namespace edk { | 19 namespace edk { |
30 | 20 |
31 namespace { | 21 namespace { |
32 | 22 |
33 const uint32_t kInvalidMessagePipeHandleIndex = static_cast<uint32_t>(-1); | 23 // Header attached to every message sent over a message pipe. |
34 | 24 struct MOJO_ALIGNAS(8) MessageHeader { |
35 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { | 25 // The number of serialized dispatchers included in this header. |
36 MOJO_ALIGNAS(1) bool transferable; | 26 uint32_t num_dispatchers; |
37 MOJO_ALIGNAS(1) bool write_error; | 27 |
38 MOJO_ALIGNAS(8) uint64_t pipe_id; // If transferable is false. | 28 // Total size of the header, including serialized dispatcher data. |
39 // The following members are only set if transferable is true. | 29 uint32_t header_size; |
40 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP | |
41 // was closed. | |
42 MOJO_ALIGNAS(4) uint32_t platform_handle_index; | |
43 | |
44 MOJO_ALIGNAS(4) uint32_t | |
45 shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) | |
46 MOJO_ALIGNAS(4) uint32_t shared_memory_size; | |
47 | |
48 MOJO_ALIGNAS(4) uint32_t serialized_read_buffer_size; | |
49 MOJO_ALIGNAS(4) uint32_t serialized_write_buffer_size; | |
50 MOJO_ALIGNAS(4) uint32_t serialized_message_queue_size; | |
51 | |
52 // These are the FDs required as part of serializing channel_ and | |
53 // message_queue_. This is only used on POSIX. | |
54 MOJO_ALIGNAS(4) | |
55 uint32_t serialized_fds_index; // (Or |kInvalidMessagePipeHandleIndex|.) | |
56 MOJO_ALIGNAS(4) uint32_t serialized_read_fds_length; | |
57 MOJO_ALIGNAS(4) uint32_t serialized_write_fds_length; | |
58 MOJO_ALIGNAS(4) uint32_t serialized_message_fds_length; | |
59 }; | 30 }; |
60 | 31 |
61 char* SerializeBuffer(char* start, std::vector<char>* buffer) { | 32 // Header for each dispatcher, immediately following the message header. |
62 if (buffer->size()) | 33 struct MOJO_ALIGNAS(8) DispatcherHeader { |
63 memcpy(start, &(*buffer)[0], buffer->size()); | 34 // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. |
64 return start + buffer->size(); | 35 int32_t type; |
65 } | 36 |
66 | 37 // The size of the serialized dispatcher, not including this header. |
67 bool GetHandle(size_t index, | 38 uint32_t num_bytes; |
68 PlatformHandleVector* platform_handles, | 39 |
69 ScopedPlatformHandle* handle) { | 40 // The number of ports needed to deserialize this dispatcher. |
70 if (index == kInvalidMessagePipeHandleIndex) | 41 uint32_t num_ports; |
71 return true; | 42 |
72 | 43 // The number of platform handles needed to deserialize this dispatcher. |
73 if (!platform_handles || index >= platform_handles->size()) { | 44 uint32_t num_platform_handles; |
74 LOG(ERROR) | 45 }; |
75 << "Invalid serialized message pipe dispatcher (missing handles)"; | 46 |
76 return false; | 47 struct MOJO_ALIGNAS(8) SerializedState { |
77 } | 48 uint64_t pipe_id; |
78 | 49 int8_t endpoint; |
79 // We take ownership of the handle, so we have to invalidate the one in | 50 }; |
80 // |platform_handles|. | |
81 handle->reset((*platform_handles)[index]); | |
82 (*platform_handles)[index] = PlatformHandle(); | |
83 return true; | |
84 } | |
85 | |
86 #if defined(OS_POSIX) | |
87 void ClosePlatformHandles(std::vector<int>* fds) { | |
88 for (size_t i = 0; i < fds->size(); ++i) | |
89 PlatformHandle((*fds)[i]).CloseIfNecessary(); | |
90 } | |
91 #endif | |
92 | 51 |
93 } // namespace | 52 } // namespace |
94 | 53 |
95 // MessagePipeDispatcher ------------------------------------------------------- | 54 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a |
96 | 55 // reference to the MPD to ensure it lives as long as the observed port. |
97 const MojoCreateMessagePipeOptions | 56 class MessagePipeDispatcher::PortObserverThunk |
98 MessagePipeDispatcher::kDefaultCreateOptions = { | 57 : public NodeController::PortObserver { |
99 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | 58 public: |
100 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | 59 explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher) |
101 | 60 : dispatcher_(dispatcher) {} |
102 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | 61 |
103 const MojoCreateMessagePipeOptions* in_options, | 62 private: |
104 MojoCreateMessagePipeOptions* out_options) { | 63 ~PortObserverThunk() override {} |
105 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | 64 |
106 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | | 65 // NodeController::PortObserver: |
107 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; | 66 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
108 | 67 |
109 *out_options = kDefaultCreateOptions; | 68 scoped_refptr<MessagePipeDispatcher> dispatcher_; |
110 if (!in_options) | 69 |
111 return MOJO_RESULT_OK; | 70 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
112 | 71 }; |
113 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | 72 |
114 if (!reader.is_valid()) | 73 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
115 return MOJO_RESULT_INVALID_ARGUMENT; | 74 const ports::PortRef& port, |
116 | 75 uint64_t pipe_id, |
117 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | 76 int endpoint) |
118 return MOJO_RESULT_OK; | 77 : node_controller_(node_controller), |
119 if ((reader.options().flags & ~kKnownFlags)) | 78 port_(port), |
120 return MOJO_RESULT_UNIMPLEMENTED; | 79 pipe_id_(pipe_id), |
121 out_options->flags = reader.options().flags; | 80 endpoint_(endpoint) { |
122 | 81 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
123 // Checks for fields beyond |flags|: | 82 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
124 | 83 |
125 // (Nothing here yet.) | 84 node_controller_->SetPortObserver( |
126 | 85 port_, |
127 return MOJO_RESULT_OK; | 86 make_scoped_refptr(new PortObserverThunk(this))); |
128 } | |
129 | |
130 void MessagePipeDispatcher::Init( | |
131 ScopedPlatformHandle message_pipe, | |
132 char* serialized_read_buffer, size_t serialized_read_buffer_size, | |
133 char* serialized_write_buffer, size_t serialized_write_buffer_size, | |
134 std::vector<int>* serialized_read_fds, | |
135 std::vector<int>* serialized_write_fds) { | |
136 CHECK(transferable_); | |
137 if (message_pipe.get().is_valid()) { | |
138 channel_ = RawChannel::Create(std::move(message_pipe)); | |
139 | |
140 // TODO(jam): It's probably cleaner to pass this in Init call. | |
141 channel_->SetSerializedData( | |
142 serialized_read_buffer, serialized_read_buffer_size, | |
143 serialized_write_buffer, serialized_write_buffer_size, | |
144 serialized_read_fds, serialized_write_fds); | |
145 internal::g_io_thread_task_runner->PostTask( | |
146 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | |
147 } | |
148 } | |
149 | |
150 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { | |
151 CHECK(!transferable_); | |
152 pipe_id_ = pipe_id; | |
153 } | |
154 | |
155 void MessagePipeDispatcher::InitOnIO() { | |
156 base::AutoLock locker(lock()); | |
157 CHECK(transferable_); | |
158 calling_init_ = true; | |
159 if (channel_) | |
160 channel_->Init(this); | |
161 calling_init_ = false; | |
162 } | |
163 | |
164 void MessagePipeDispatcher::CloseOnIOAndRelease() { | |
165 { | |
166 base::AutoLock locker(lock()); | |
167 CloseOnIO(); | |
168 } | |
169 Release(); // To match CloseImplNoLock. | |
170 } | |
171 | |
172 void MessagePipeDispatcher::CloseOnIO() { | |
173 lock().AssertAcquired(); | |
174 | |
175 if (channel_) { | |
176 // If we closed the channel now then in-flight message pipes wouldn't get | |
177 // closed, and their other side wouldn't get a connection error notification | |
178 // which could lead to hangs or leaks. So we ask the other side of this | |
179 // message pipe to close, which ensures that we have dispatched all | |
180 // in-flight message pipes. | |
181 DCHECK(!close_requested_); | |
182 close_requested_ = true; | |
183 AddRef(); | |
184 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
185 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); | |
186 if (!transferable_) | |
187 message->set_route_id(pipe_id_); | |
188 channel_->WriteMessage(std::move(message)); | |
189 return; | |
190 } | |
191 | |
192 if (!transferable_ && | |
193 (non_transferable_state_ == CONNECT_CALLED || | |
194 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { | |
195 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
196 RequestNontransferableChannel(); | |
197 | |
198 // We can't cancel the pending request yet, since the other side of the | |
199 // message pipe would want to get pending outgoing messages (if any) or at | |
200 // least know that this end was closed. So keep this object alive until | |
201 // then. | |
202 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | |
203 AddRef(); | |
204 } | |
205 } | 87 } |
206 | 88 |
207 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 89 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
208 return Type::MESSAGE_PIPE; | 90 return Type::MESSAGE_PIPE; |
209 } | 91 } |
210 | 92 |
211 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { | 93 MojoResult MessagePipeDispatcher::Close() { |
212 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | 94 base::AutoLock lock(signal_lock_); |
213 base::AutoLock locker(lock()); | 95 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
214 channel_ = channel; | 96 << " [port=" << port_.name() << "]"; |
215 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { | 97 return CloseNoLock(); |
216 channel_->WriteMessage( | 98 } |
217 non_transferable_outgoing_message_queue_.GetMessage()); | 99 |
218 } | 100 MojoResult MessagePipeDispatcher::WriteMessage( |
219 | |
220 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { | |
221 non_transferable_state_ = CONNECTED; | |
222 // We kept this object alive until it's connected, we can close it now. | |
223 CloseOnIO(); | |
224 // Balance the AddRef in CloseOnIO. | |
225 Release(); | |
226 return; | |
227 } | |
228 | |
229 non_transferable_state_ = CONNECTED; | |
230 } | |
231 | |
232 #if defined(OS_WIN) | |
233 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | |
234 // best way we want to share this. | |
235 // Since this is used for serialization of messages read/written to a MP that | |
236 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | |
237 // them when a MP is being sent. As a result, even for POSIX we will probably | |
238 // want to send the handles to the shell process and exchange them for tokens | |
239 // (since we can be sure that the shell will respond to our IPCs, compared to | |
240 // the other end where we're sending the MP to, which may not be reading...). | |
241 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | |
242 size_t num_platform_handles, | |
243 const void* platform_handle_table) { | |
244 ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector()); | |
245 rv->resize(num_platform_handles); | |
246 | |
247 const uint64_t* tokens = | |
248 static_cast<const uint64_t*>(platform_handle_table); | |
249 internal::g_broker->TokenToHandle(tokens, num_platform_handles, &rv->at(0)); | |
250 return rv.Pass(); | |
251 } | |
252 #endif | |
253 | |
254 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( | |
255 const void* source, | |
256 size_t size, | |
257 PlatformHandleVector* platform_handles) { | |
258 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { | |
259 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; | |
260 return nullptr; | |
261 } | |
262 | |
263 const SerializedMessagePipeHandleDispatcher* serialization = | |
264 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); | |
265 | |
266 scoped_refptr<MessagePipeDispatcher> rv( | |
267 new MessagePipeDispatcher(serialization->transferable)); | |
268 if (!rv->transferable_) { | |
269 rv->InitNonTransferable(serialization->pipe_id); | |
270 return rv; | |
271 } | |
272 | |
273 if (serialization->shared_memory_size != | |
274 (serialization->serialized_read_buffer_size + | |
275 serialization->serialized_write_buffer_size + | |
276 serialization->serialized_message_queue_size)) { | |
277 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; | |
278 return nullptr; | |
279 } | |
280 | |
281 ScopedPlatformHandle platform_handle, shared_memory_handle; | |
282 if (!GetHandle(serialization->platform_handle_index, | |
283 platform_handles, &platform_handle) || | |
284 !GetHandle(serialization->shared_memory_handle_index, | |
285 platform_handles, &shared_memory_handle)) { | |
286 return nullptr; | |
287 } | |
288 | |
289 char* serialized_read_buffer = nullptr; | |
290 size_t serialized_read_buffer_size = 0; | |
291 char* serialized_write_buffer = nullptr; | |
292 size_t serialized_write_buffer_size = 0; | |
293 char* message_queue_data = nullptr; | |
294 size_t message_queue_size = 0; | |
295 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
296 scoped_ptr<PlatformSharedBufferMapping> mapping; | |
297 if (shared_memory_handle.is_valid()) { | |
298 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | |
299 serialization->shared_memory_size, std::move(shared_memory_handle)); | |
300 mapping = shared_buffer->Map(0, serialization->shared_memory_size); | |
301 char* buffer = static_cast<char*>(mapping->GetBase()); | |
302 if (serialization->serialized_read_buffer_size) { | |
303 serialized_read_buffer = buffer; | |
304 serialized_read_buffer_size = serialization->serialized_read_buffer_size; | |
305 buffer += serialized_read_buffer_size; | |
306 } | |
307 if (serialization->serialized_write_buffer_size) { | |
308 serialized_write_buffer = buffer; | |
309 serialized_write_buffer_size = | |
310 serialization->serialized_write_buffer_size; | |
311 buffer += serialized_write_buffer_size; | |
312 } | |
313 if (serialization->serialized_message_queue_size) { | |
314 message_queue_data = buffer; | |
315 message_queue_size = serialization->serialized_message_queue_size; | |
316 buffer += message_queue_size; | |
317 } | |
318 } | |
319 | |
320 rv->write_error_ = serialization->write_error; | |
321 | |
322 std::vector<int> serialized_read_fds; | |
323 std::vector<int> serialized_write_fds; | |
324 #if defined(OS_POSIX) | |
325 std::vector<int> serialized_fds; | |
326 size_t serialized_fds_index = 0; | |
327 | |
328 size_t total_fd_count = serialization->serialized_read_fds_length + | |
329 serialization->serialized_write_fds_length + | |
330 serialization->serialized_message_fds_length; | |
331 for (size_t i = 0; i < total_fd_count; ++i) { | |
332 ScopedPlatformHandle handle; | |
333 if (!GetHandle(serialization->serialized_fds_index + i, platform_handles, | |
334 &handle)) { | |
335 ClosePlatformHandles(&serialized_fds); | |
336 return nullptr; | |
337 } | |
338 serialized_fds.push_back(handle.release().handle); | |
339 } | |
340 | |
341 serialized_read_fds.assign( | |
342 serialized_fds.begin(), | |
343 serialized_fds.begin() + serialization->serialized_read_fds_length); | |
344 serialized_fds_index += serialization->serialized_read_fds_length; | |
345 serialized_write_fds.assign( | |
346 serialized_fds.begin() + serialized_fds_index, | |
347 serialized_fds.begin() + serialized_fds_index + | |
348 serialization->serialized_write_fds_length); | |
349 serialized_fds_index += serialization->serialized_write_fds_length; | |
350 #endif | |
351 | |
352 while (message_queue_size) { | |
353 size_t message_size; | |
354 if (!MessageInTransit::GetNextMessageSize( | |
355 message_queue_data, message_queue_size, &message_size)) { | |
356 NOTREACHED() << "Couldn't read message size from serialized data."; | |
357 return nullptr; | |
358 } | |
359 if (message_size > message_queue_size) { | |
360 NOTREACHED() << "Invalid serialized message size."; | |
361 return nullptr; | |
362 } | |
363 MessageInTransit::View message_view(message_size, message_queue_data); | |
364 message_queue_size -= message_size; | |
365 message_queue_data += message_size; | |
366 | |
367 // TODO(jam): Copied below from RawChannelWin. See commment above | |
368 // GetReadPlatformHandles. | |
369 ScopedPlatformHandleVectorPtr temp_platform_handles; | |
370 if (message_view.transport_data_buffer()) { | |
371 size_t num_platform_handles; | |
372 const void* platform_handle_table; | |
373 TransportData::GetPlatformHandleTable( | |
374 message_view.transport_data_buffer(), &num_platform_handles, | |
375 &platform_handle_table); | |
376 | |
377 if (num_platform_handles > 0) { | |
378 #if defined(OS_WIN) | |
379 temp_platform_handles = | |
380 GetReadPlatformHandles(num_platform_handles, | |
381 platform_handle_table).Pass(); | |
382 if (!temp_platform_handles) { | |
383 LOG(ERROR) << "Invalid number of platform handles received"; | |
384 return nullptr; | |
385 } | |
386 #else | |
387 temp_platform_handles.reset(new PlatformHandleVector()); | |
388 for (size_t i = 0; i < num_platform_handles; ++i) | |
389 temp_platform_handles->push_back( | |
390 PlatformHandle(serialized_fds[serialized_fds_index++])); | |
391 #endif | |
392 } | |
393 } | |
394 | |
395 // TODO(jam): Copied below from RawChannelWin. See commment above | |
396 // GetReadPlatformHandles. | |
397 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | |
398 if (message_view.transport_data_buffer_size() > 0) { | |
399 DCHECK(message_view.transport_data_buffer()); | |
400 message->SetDispatchers(TransportData::DeserializeDispatchers( | |
401 message_view.transport_data_buffer(), | |
402 message_view.transport_data_buffer_size(), | |
403 std::move(temp_platform_handles))); | |
404 } | |
405 | |
406 rv->message_queue_.AddMessage(std::move(message)); | |
407 } | |
408 | |
409 rv->Init(std::move(platform_handle), serialized_read_buffer, | |
410 serialized_read_buffer_size, serialized_write_buffer, | |
411 serialized_write_buffer_size, &serialized_read_fds, | |
412 &serialized_write_fds); | |
413 | |
414 if (message_queue_size) { // Should be empty by now. | |
415 LOG(ERROR) << "Invalid queued messages"; | |
416 return nullptr; | |
417 } | |
418 | |
419 return rv; | |
420 } | |
421 | |
422 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) | |
423 : channel_(nullptr), | |
424 serialized_read_fds_length_(0u), | |
425 serialized_write_fds_length_(0u), | |
426 serialized_message_fds_length_(0u), | |
427 pipe_id_(0), | |
428 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), | |
429 serialized_(false), | |
430 calling_init_(false), | |
431 write_error_(false), | |
432 transferable_(transferable), | |
433 close_requested_(false) { | |
434 } | |
435 | |
436 MessagePipeDispatcher::~MessagePipeDispatcher() { | |
437 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | |
438 // exception is if they posted a task to run CloseOnIO but the IO thread shut | |
439 // down and so when it was deleting pending tasks it caused the last reference | |
440 // to destruct this object. In that case, safe to destroy the channel. | |
441 if (channel_ && | |
442 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { | |
443 if (transferable_) { | |
444 channel_->Shutdown(); | |
445 } else { | |
446 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
447 } | |
448 } else { | |
449 DCHECK(!channel_); | |
450 } | |
451 #if defined(OS_POSIX) | |
452 ClosePlatformHandles(&serialized_fds_); | |
453 #endif | |
454 } | |
455 | |
456 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { | |
457 lock().AssertAcquired(); | |
458 awakable_list_.CancelAll(); | |
459 } | |
460 | |
461 void MessagePipeDispatcher::CloseImplNoLock() { | |
462 lock().AssertAcquired(); | |
463 // This early circuit fixes leak in unit tests. There's nothing to do in the | |
464 // posted task. | |
465 if (!transferable_ && non_transferable_state_ == CLOSED) | |
466 return; | |
467 | |
468 // We take a manual refcount because at shutdown, the task below might not get | |
469 // a chance to execute. If that happens, the RawChannel will still call our | |
470 // OnError method because it always runs (since it watches thread | |
471 // destruction). So to avoid UAF, manually add a reference and only release it | |
472 // if the task runs. | |
473 AddRef(); | |
474 if (!internal::g_io_thread_task_runner->PostTask( | |
475 FROM_HERE, | |
476 base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this))) { | |
477 // Avoid a shutdown leak in unittests. If the thread is shutting down, | |
478 // we can't connect to the other end to let it know that we're closed either | |
479 // way. | |
480 if (!transferable_ && non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
481 Release(); | |
482 } | |
483 } | |
484 | |
485 void MessagePipeDispatcher::SerializeInternal() { | |
486 serialized_ = true; | |
487 if (!transferable_) { | |
488 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
489 << "Non transferable message pipe being sent after read/write/waited. " | |
490 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " | |
491 << "the pipe can be sent after it's read or written. This message pipe " | |
492 << "was previously bound at:\n" | |
493 << non_transferable_bound_stack_->ToString(); | |
494 | |
495 non_transferable_state_ = SERIALISED; | |
496 return; | |
497 } | |
498 | |
499 // We need to stop watching handle immediately, even though not on IO thread, | |
500 // so that other messages aren't read after this. | |
501 std::vector<int> serialized_read_fds, serialized_write_fds; | |
502 if (channel_) { | |
503 bool write_error = false; | |
504 | |
505 serialized_platform_handle_ = channel_->ReleaseHandle( | |
506 &serialized_read_buffer_, &serialized_write_buffer_, | |
507 &serialized_read_fds, &serialized_write_fds, &write_error); | |
508 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), | |
509 serialized_read_fds.end()); | |
510 serialized_read_fds_length_ = serialized_read_fds.size(); | |
511 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), | |
512 serialized_write_fds.end()); | |
513 serialized_write_fds_length_ = serialized_write_fds.size(); | |
514 channel_ = nullptr; | |
515 } else { | |
516 // It's valid that the other side wrote some data and closed its end. | |
517 } | |
518 | |
519 DCHECK(serialized_message_queue_.empty()); | |
520 while (!message_queue_.IsEmpty()) { | |
521 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); | |
522 | |
523 // When MojoWriteMessage is called, the MessageInTransit doesn't have | |
524 // dispatchers set and CreateEquivaent... is called since the dispatchers | |
525 // can be referenced by others. here dispatchers aren't referenced by | |
526 // others, but rawchannel can still call to them. so since we dont call | |
527 // createequiv, manually call TransportStarted and TransportEnd. | |
528 DispatcherVector dispatchers; | |
529 if (message->has_dispatchers()) | |
530 dispatchers = *message->dispatchers(); | |
531 for (size_t i = 0; i < dispatchers.size(); ++i) | |
532 dispatchers[i]->TransportStarted(); | |
533 | |
534 // TODO(jam): this handling for dispatchers only works on windows where we | |
535 // send transportdata as bytes instead of as parameters to sendmsg. | |
536 message->SerializeAndCloseDispatchers(); | |
537 // cont'd below | |
538 | |
539 size_t main_buffer_size = message->main_buffer_size(); | |
540 size_t transport_data_buffer_size = message->transport_data() ? | |
541 message->transport_data()->buffer_size() : 0; | |
542 | |
543 serialized_message_queue_.insert( | |
544 serialized_message_queue_.end(), | |
545 static_cast<const char*>(message->main_buffer()), | |
546 static_cast<const char*>(message->main_buffer()) + main_buffer_size); | |
547 | |
548 // cont'd | |
549 if (transport_data_buffer_size != 0) { | |
550 // TODO(jam): copied from RawChannelWin::WriteNoLock( | |
551 PlatformHandleVector* all_platform_handles = | |
552 message->transport_data()->platform_handles(); | |
553 if (all_platform_handles) { | |
554 #if defined(OS_WIN) | |
555 uint64_t* tokens = reinterpret_cast<uint64_t*>( | |
556 static_cast<char*>(message->transport_data()->buffer()) + | |
557 message->transport_data()->platform_handle_table_offset()); | |
558 internal::g_broker->HandleToToken( | |
559 &all_platform_handles->at(0), all_platform_handles->size(), tokens); | |
560 for (size_t i = 0; i < all_platform_handles->size(); i++) | |
561 all_platform_handles->at(i) = PlatformHandle(); | |
562 #else | |
563 for (size_t i = 0; i < all_platform_handles->size(); i++) { | |
564 serialized_fds_.push_back(all_platform_handles->at(i).handle); | |
565 serialized_message_fds_length_++; | |
566 all_platform_handles->at(i) = PlatformHandle(); | |
567 } | |
568 #endif | |
569 } | |
570 | |
571 serialized_message_queue_.insert( | |
572 serialized_message_queue_.end(), | |
573 static_cast<const char*>(message->transport_data()->buffer()), | |
574 static_cast<const char*>(message->transport_data()->buffer()) + | |
575 transport_data_buffer_size); | |
576 } | |
577 | |
578 for (size_t i = 0; i < dispatchers.size(); ++i) | |
579 dispatchers[i]->TransportEnded(); | |
580 } | |
581 } | |
582 | |
583 scoped_refptr<Dispatcher> | |
584 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
585 lock().AssertAcquired(); | |
586 | |
587 SerializeInternal(); | |
588 | |
589 scoped_refptr<MessagePipeDispatcher> rv( | |
590 new MessagePipeDispatcher(transferable_)); | |
591 rv->serialized_ = true; | |
592 if (transferable_) { | |
593 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); | |
594 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
595 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
596 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
597 serialized_fds_.swap(rv->serialized_fds_); | |
598 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | |
599 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | |
600 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | |
601 rv->write_error_ = write_error_; | |
602 } else { | |
603 rv->pipe_id_ = pipe_id_; | |
604 rv->non_transferable_state_ = non_transferable_state_; | |
605 } | |
606 return rv; | |
607 } | |
608 | |
609 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | |
610 const void* bytes, | 101 const void* bytes, |
611 uint32_t num_bytes, | 102 uint32_t num_bytes, |
612 std::vector<DispatcherTransport>* transports, | 103 const DispatcherInTransit* dispatchers, |
| 104 uint32_t num_dispatchers, |
613 MojoWriteMessageFlags flags) { | 105 MojoWriteMessageFlags flags) { |
614 lock().AssertAcquired(); | 106 |
615 | 107 { |
616 DCHECK(!transports || | 108 base::AutoLock lock(signal_lock_); |
617 (transports->size() > 0 && | 109 if (port_closed_ || in_transit_) |
618 transports->size() <= GetConfiguration().max_message_num_handles)); | 110 return MOJO_RESULT_INVALID_ARGUMENT; |
619 | 111 } |
620 if (write_error_ || | 112 |
621 (transferable_ && !channel_) || | 113 // A structure for retaining information about every Dispatcher we're about |
622 (!transferable_ && non_transferable_state_ == CLOSED)) { | 114 // to send. This information is collected by calling StartSerialize() on |
| 115 // each dispatcher in sequence. |
| 116 struct DispatcherInfo { |
| 117 uint32_t num_bytes; |
| 118 uint32_t num_ports; |
| 119 uint32_t num_handles; |
| 120 }; |
| 121 |
| 122 // This is only the base header size. It will grow as we accumulate the |
| 123 // size of serialized state for each dispatcher. |
| 124 size_t header_size = sizeof(MessageHeader) + |
| 125 num_dispatchers * sizeof(DispatcherHeader); |
| 126 |
| 127 size_t num_ports = 0; |
| 128 size_t num_handles = 0; |
| 129 |
| 130 std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); |
| 131 for (size_t i = 0; i < num_dispatchers; ++i) { |
| 132 Dispatcher* d = dispatchers[i].dispatcher.get(); |
| 133 d->StartSerialize(&dispatcher_info[i].num_bytes, |
| 134 &dispatcher_info[i].num_ports, |
| 135 &dispatcher_info[i].num_handles); |
| 136 header_size += dispatcher_info[i].num_bytes; |
| 137 num_ports += dispatcher_info[i].num_ports; |
| 138 num_handles += dispatcher_info[i].num_handles; |
| 139 } |
| 140 |
| 141 // We now have enough information to fully allocate the message storage. |
| 142 scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage( |
| 143 header_size + num_bytes, num_ports, num_handles); |
| 144 DCHECK(message); |
| 145 |
| 146 // Populate the message header with information about serialized dispatchers. |
| 147 // |
| 148 // The front of the message is always a MessageHeader followed by a |
| 149 // DispatcherHeader for each dispatcher to be sent. |
| 150 MessageHeader* header = |
| 151 static_cast<MessageHeader*>(message->mutable_payload_bytes()); |
| 152 DispatcherHeader* dispatcher_headers = |
| 153 reinterpret_cast<DispatcherHeader*>(header + 1); |
| 154 |
| 155 // Serialized dispatcher state immediately follows the series of |
| 156 // DispatcherHeaders. |
| 157 char* dispatcher_data = |
| 158 reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); |
| 159 |
| 160 header->num_dispatchers = num_dispatchers; |
| 161 |
| 162 // |header_size| is the total number of bytes preceding the message payload, |
| 163 // including all dispatcher headers and serialized dispatcher state. |
| 164 DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); |
| 165 header->header_size = static_cast<uint32_t>(header_size); |
| 166 |
| 167 bool cancel_transit = false; |
| 168 if (num_dispatchers > 0) { |
| 169 ScopedPlatformHandleVectorPtr handles( |
| 170 new PlatformHandleVector(num_handles)); |
| 171 size_t port_index = 0; |
| 172 size_t handle_index = 0; |
| 173 for (size_t i = 0; i < num_dispatchers; ++i) { |
| 174 Dispatcher* d = dispatchers[i].dispatcher.get(); |
| 175 DispatcherHeader* dh = &dispatcher_headers[i]; |
| 176 const DispatcherInfo& info = dispatcher_info[i]; |
| 177 |
| 178 // Fill in the header for this dispatcher. |
| 179 dh->type = static_cast<int32_t>(d->GetType()); |
| 180 dh->num_bytes = info.num_bytes; |
| 181 dh->num_ports = info.num_ports; |
| 182 dh->num_platform_handles = info.num_handles; |
| 183 |
| 184 // Fill in serialized state, ports, and platform handles. We'll cancel |
| 185 // the send if the dispatcher implementation rejects for some reason. |
| 186 if (!d->EndSerialize(static_cast<void*>(dispatcher_data), |
| 187 message->mutable_ports() + port_index, |
| 188 handles->data() + handle_index)) { |
| 189 cancel_transit = true; |
| 190 break; |
| 191 } |
| 192 |
| 193 dispatcher_data += info.num_bytes; |
| 194 port_index += info.num_ports; |
| 195 handle_index += info.num_handles; |
| 196 } |
| 197 |
| 198 if (!cancel_transit) { |
| 199 // Take ownership of all the handles and move them into message storage. |
| 200 message->SetHandles(std::move(handles)); |
| 201 } else { |
| 202 // Release any platform handles we've accumulated. Their dispatchers |
| 203 // retain ownership when transit is canceled, so these are not actually |
| 204 // leaking. |
| 205 handles->clear(); |
| 206 } |
| 207 } |
| 208 |
| 209 MojoResult result = MOJO_RESULT_OK; |
| 210 if (!cancel_transit) { |
| 211 // Copy the message body. |
| 212 void* message_body = static_cast<void*>( |
| 213 static_cast<char*>(message->mutable_payload_bytes()) + header_size); |
| 214 memcpy(message_body, bytes, num_bytes); |
| 215 |
| 216 int rv = node_controller_->SendMessage(port_, &message); |
| 217 |
| 218 DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
| 219 << " [port=" << port_.name() << "; rv=" << rv |
| 220 << "; num_bytes=" << num_bytes << "]"; |
| 221 |
| 222 if (rv != ports::OK) { |
| 223 if (rv == ports::ERROR_PORT_UNKNOWN || |
| 224 rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
| 225 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
| 226 result = MOJO_RESULT_INVALID_ARGUMENT; |
| 227 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
| 228 base::AutoLock lock(signal_lock_); |
| 229 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 230 result = MOJO_RESULT_FAILED_PRECONDITION; |
| 231 } else { |
| 232 NOTREACHED(); |
| 233 result = MOJO_RESULT_UNKNOWN; |
| 234 } |
| 235 cancel_transit = true; |
| 236 } else { |
| 237 DCHECK(!message); |
| 238 } |
| 239 } |
| 240 |
| 241 if (cancel_transit) { |
| 242 // We ended up not sending the message. Release all the platform handles. |
| 243 // Their dipatchers retain ownership when transit is canceled, so these are |
| 244 // not actually leaking. |
| 245 DCHECK(message); |
| 246 Channel::MessagePtr m = message->TakeChannelMessage(); |
| 247 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); |
| 248 if (handles) |
| 249 handles->clear(); |
| 250 } |
| 251 |
| 252 return result; |
| 253 } |
| 254 |
| 255 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| 256 uint32_t* num_bytes, |
| 257 MojoHandle* handles, |
| 258 uint32_t* num_handles, |
| 259 MojoReadMessageFlags flags) { |
| 260 { |
| 261 base::AutoLock lock(signal_lock_); |
| 262 // We can't read from a port that's closed or in transit! |
| 263 if (port_closed_ || in_transit_) |
| 264 return MOJO_RESULT_INVALID_ARGUMENT; |
| 265 } |
| 266 |
| 267 bool no_space = false; |
| 268 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
| 269 |
| 270 // Ensure the provided buffers are large enough to hold the next message. |
| 271 // GetMessageIf provides an atomic way to test the next message without |
| 272 // committing to removing it from the port's underlying message queue until |
| 273 // we are sure we can consume it. |
| 274 |
| 275 ports::ScopedMessage ports_message; |
| 276 int rv = node_controller_->node()->GetMessageIf( |
| 277 port_, |
| 278 [num_bytes, num_handles, &no_space, &may_discard]( |
| 279 const ports::Message& next_message) { |
| 280 const PortsMessage& message = |
| 281 static_cast<const PortsMessage&>(next_message); |
| 282 DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader)); |
| 283 |
| 284 const MessageHeader* header = |
| 285 static_cast<const MessageHeader*>(message.payload_bytes()); |
| 286 DCHECK_LE(header->header_size, message.num_payload_bytes()); |
| 287 |
| 288 uint32_t bytes_to_read = 0; |
| 289 uint32_t bytes_available = |
| 290 static_cast<uint32_t>(message.num_payload_bytes()) - |
| 291 header->header_size; |
| 292 if (num_bytes) { |
| 293 bytes_to_read = std::min(*num_bytes, bytes_available); |
| 294 *num_bytes = bytes_available; |
| 295 } |
| 296 |
| 297 uint32_t handles_to_read = 0; |
| 298 uint32_t handles_available = header->num_dispatchers; |
| 299 if (num_handles) { |
| 300 handles_to_read = std::min(*num_handles, handles_available); |
| 301 *num_handles = handles_available; |
| 302 } |
| 303 |
| 304 if (bytes_to_read < bytes_available || |
| 305 handles_to_read < handles_available) { |
| 306 no_space = true; |
| 307 return may_discard; |
| 308 } |
| 309 |
| 310 return true; |
| 311 }, |
| 312 &ports_message); |
| 313 |
| 314 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
| 315 if (rv == ports::ERROR_PORT_UNKNOWN || |
| 316 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
| 317 return MOJO_RESULT_INVALID_ARGUMENT; |
| 318 |
| 319 NOTREACHED(); |
| 320 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
| 321 } |
| 322 |
| 323 if (no_space) { |
| 324 // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this |
| 325 // message's data. The message will still be in queue unless |
| 326 // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
| 327 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 328 } |
| 329 |
| 330 if (!ports_message) { |
| 331 // No message was available in queue. |
| 332 |
| 333 if (rv == ports::OK) |
| 334 return MOJO_RESULT_SHOULD_WAIT; |
| 335 |
| 336 // Peer is closed and there are no more messages to read. |
| 337 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
| 338 base::AutoLock lock(signal_lock_); |
| 339 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
623 return MOJO_RESULT_FAILED_PRECONDITION; | 340 return MOJO_RESULT_FAILED_PRECONDITION; |
624 } | 341 } |
625 | 342 |
626 if (num_bytes > GetConfiguration().max_message_num_bytes) | 343 // Alright! We have a message and the caller has provided sufficient storage |
627 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 344 // in which to receive it. |
628 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 345 |
629 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); | 346 scoped_ptr<PortsMessage> message( |
630 if (transports) { | 347 static_cast<PortsMessage*>(ports_message.release())); |
631 MojoResult result = AttachTransportsNoLock(message.get(), transports); | 348 |
632 if (result != MOJO_RESULT_OK) | 349 const MessageHeader* header = |
633 return result; | 350 static_cast<const MessageHeader*>(message->payload_bytes()); |
634 } | 351 const DispatcherHeader* dispatcher_headers = |
635 | 352 reinterpret_cast<const DispatcherHeader*>(header + 1); |
636 message->SerializeAndCloseDispatchers(); | 353 |
637 if (!transferable_) | 354 const char* dispatcher_data = reinterpret_cast<const char*>( |
638 message->set_route_id(pipe_id_); | 355 dispatcher_headers + header->num_dispatchers); |
639 if (!transferable_ && | 356 |
640 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || | 357 // Deserialize dispatchers. |
641 non_transferable_state_ == CONNECT_CALLED)) { | 358 if (header->num_dispatchers > 0) { |
642 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | 359 CHECK(handles); |
643 RequestNontransferableChannel(); | 360 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); |
644 non_transferable_outgoing_message_queue_.AddMessage(std::move(message)); | 361 size_t data_payload_index = sizeof(MessageHeader) + |
645 } else { | 362 header->num_dispatchers * sizeof(DispatcherHeader); |
646 channel_->WriteMessage(std::move(message)); | 363 size_t port_index = 0; |
647 } | 364 size_t platform_handle_index = 0; |
| 365 for (size_t i = 0; i < header->num_dispatchers; ++i) { |
| 366 const DispatcherHeader& dh = dispatcher_headers[i]; |
| 367 Type type = static_cast<Type>(dh.type); |
| 368 |
| 369 DCHECK_GE(message->num_payload_bytes(), |
| 370 data_payload_index + dh.num_bytes); |
| 371 DCHECK_GE(message->num_ports(), |
| 372 port_index + dh.num_ports); |
| 373 DCHECK_GE(message->num_handles(), |
| 374 platform_handle_index + dh.num_platform_handles); |
| 375 |
| 376 PlatformHandle* out_handles = |
| 377 message->num_handles() ? message->handles() + platform_handle_index |
| 378 : nullptr; |
| 379 dispatchers[i].dispatcher = Dispatcher::Deserialize( |
| 380 type, dispatcher_data, dh.num_bytes, message->ports() + port_index, |
| 381 dh.num_ports, out_handles, dh.num_platform_handles); |
| 382 if (!dispatchers[i].dispatcher) |
| 383 return MOJO_RESULT_UNKNOWN; |
| 384 |
| 385 dispatcher_data += dh.num_bytes; |
| 386 data_payload_index += dh.num_bytes; |
| 387 port_index += dh.num_ports; |
| 388 platform_handle_index += dh.num_platform_handles; |
| 389 } |
| 390 |
| 391 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, |
| 392 handles)) |
| 393 return MOJO_RESULT_UNKNOWN; |
| 394 } |
| 395 |
| 396 // Copy message bytes. |
| 397 DCHECK_GE(message->num_payload_bytes(), header->header_size); |
| 398 const char* payload = reinterpret_cast<const char*>(message->payload_bytes()); |
| 399 memcpy(bytes, payload + header->header_size, |
| 400 message->num_payload_bytes() - header->header_size); |
648 | 401 |
649 return MOJO_RESULT_OK; | 402 return MOJO_RESULT_OK; |
650 } | 403 } |
651 | 404 |
652 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | 405 HandleSignalsState |
653 void* bytes, | 406 MessagePipeDispatcher::GetHandleSignalsState() const { |
654 uint32_t* num_bytes, | 407 base::AutoLock lock(signal_lock_); |
655 DispatcherVector* dispatchers, | 408 return GetHandleSignalsStateNoLock(); |
656 uint32_t* num_dispatchers, | 409 } |
657 MojoReadMessageFlags flags) { | 410 |
658 lock().AssertAcquired(); | 411 MojoResult MessagePipeDispatcher::AddAwakable( |
659 if (transferable_ && channel_) { | |
660 channel_->EnsureLazyInitialized(); | |
661 } else if (!transferable_) { | |
662 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
663 RequestNontransferableChannel(); | |
664 return MOJO_RESULT_SHOULD_WAIT; | |
665 } else if (non_transferable_state_ == CONNECT_CALLED) { | |
666 return MOJO_RESULT_SHOULD_WAIT; | |
667 } | |
668 } | |
669 | |
670 DCHECK(!dispatchers || dispatchers->empty()); | |
671 | |
672 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; | |
673 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; | |
674 | |
675 if (message_queue_.IsEmpty()) | |
676 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; | |
677 | |
678 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | |
679 // and release the lock immediately. | |
680 bool enough_space = true; | |
681 MessageInTransit* message = message_queue_.PeekMessage(); | |
682 if (num_bytes) | |
683 *num_bytes = message->num_bytes(); | |
684 if (message->num_bytes() <= max_bytes) | |
685 memcpy(bytes, message->bytes(), message->num_bytes()); | |
686 else | |
687 enough_space = false; | |
688 | |
689 if (DispatcherVector* queued_dispatchers = message->dispatchers()) { | |
690 if (num_dispatchers) | |
691 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); | |
692 if (enough_space) { | |
693 if (queued_dispatchers->empty()) { | |
694 // Nothing to do. | |
695 } else if (queued_dispatchers->size() <= max_num_dispatchers) { | |
696 DCHECK(dispatchers); | |
697 dispatchers->swap(*queued_dispatchers); | |
698 } else { | |
699 enough_space = false; | |
700 } | |
701 } | |
702 } else { | |
703 if (num_dispatchers) | |
704 *num_dispatchers = 0; | |
705 } | |
706 | |
707 message = nullptr; | |
708 | |
709 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { | |
710 message_queue_.DiscardMessage(); | |
711 | |
712 // Now it's empty, thus no longer readable. | |
713 if (message_queue_.IsEmpty()) { | |
714 // It's currently not possible to wait for non-readability, but we should | |
715 // do the state change anyway. | |
716 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
717 } | |
718 } | |
719 | |
720 if (!enough_space) | |
721 return MOJO_RESULT_RESOURCE_EXHAUSTED; | |
722 | |
723 return MOJO_RESULT_OK; | |
724 } | |
725 | |
726 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | |
727 const { | |
728 lock().AssertAcquired(); | |
729 | |
730 HandleSignalsState rv; | |
731 if (!message_queue_.IsEmpty()) | |
732 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
733 if (!message_queue_.IsEmpty() || | |
734 (transferable_ && channel_) || | |
735 (!transferable_ && non_transferable_state_ != CLOSED)) | |
736 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
737 if (!write_error_ && | |
738 ((transferable_ && channel_) || | |
739 (!transferable_ && non_transferable_state_ != CLOSED))) { | |
740 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
741 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
742 } | |
743 if (write_error_ || | |
744 (transferable_ && !channel_) || | |
745 (!transferable_ && | |
746 ((non_transferable_state_ == CLOSED) || is_closed()))) { | |
747 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
748 } | |
749 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
750 return rv; | |
751 } | |
752 | |
753 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( | |
754 Awakable* awakable, | 412 Awakable* awakable, |
755 MojoHandleSignals signals, | 413 MojoHandleSignals signals, |
756 uintptr_t context, | 414 uintptr_t context, |
757 HandleSignalsState* signals_state) { | 415 HandleSignalsState* signals_state) { |
758 lock().AssertAcquired(); | 416 base::AutoLock lock(signal_lock_); |
759 if (transferable_ && channel_) { | 417 |
760 channel_->EnsureLazyInitialized(); | 418 if (port_closed_ || in_transit_) { |
761 } else if (!transferable_ && | 419 if (signals_state) |
762 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | 420 *signals_state = HandleSignalsState(); |
763 RequestNontransferableChannel(); | 421 return MOJO_RESULT_INVALID_ARGUMENT; |
764 } | 422 } |
765 | 423 |
766 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 424 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 425 |
| 426 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint " |
| 427 << endpoint_ << " [awakable=" << awakable << "; port=" |
| 428 << port_.name() << "; signals=" << signals << "; satisfied=" |
| 429 << state.satisfied_signals << "; satisfiable=" |
| 430 << state.satisfiable_signals << "]"; |
| 431 |
767 if (state.satisfies(signals)) { | 432 if (state.satisfies(signals)) { |
768 if (signals_state) | 433 if (signals_state) |
769 *signals_state = state; | 434 *signals_state = state; |
| 435 DVLOG(2) << "Signals already set for " << port_.name(); |
770 return MOJO_RESULT_ALREADY_EXISTS; | 436 return MOJO_RESULT_ALREADY_EXISTS; |
771 } | 437 } |
772 if (!state.can_satisfy(signals)) { | 438 if (!state.can_satisfy(signals)) { |
773 if (signals_state) | 439 if (signals_state) |
774 *signals_state = state; | 440 *signals_state = state; |
| 441 DVLOG(2) << "Signals impossible to satisfy for " << port_.name(); |
775 return MOJO_RESULT_FAILED_PRECONDITION; | 442 return MOJO_RESULT_FAILED_PRECONDITION; |
776 } | 443 } |
777 | 444 |
778 awakable_list_.Add(awakable, signals, context); | 445 DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint " |
| 446 << endpoint_ << " [awakable=" << awakable << "; port=" |
| 447 << port_.name() << "; signals=" << signals << "]"; |
| 448 |
| 449 awakables_.Add(awakable, signals, context); |
779 return MOJO_RESULT_OK; | 450 return MOJO_RESULT_OK; |
780 } | 451 } |
781 | 452 |
782 void MessagePipeDispatcher::RemoveAwakableImplNoLock( | 453 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable, |
783 Awakable* awakable, | 454 HandleSignalsState* signals_state) { |
784 HandleSignalsState* signals_state) { | 455 base::AutoLock lock(signal_lock_); |
785 lock().AssertAcquired(); | 456 if (port_closed_ || in_transit_) { |
786 | 457 if (signals_state) |
787 awakable_list_.Remove(awakable); | 458 *signals_state = HandleSignalsState(); |
788 if (signals_state) | 459 } else if (signals_state) { |
789 *signals_state = GetHandleSignalsStateImplNoLock(); | 460 *signals_state = GetHandleSignalsStateNoLock(); |
790 } | 461 } |
791 | 462 |
792 void MessagePipeDispatcher::StartSerializeImplNoLock( | 463 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint " |
793 size_t* max_size, | 464 << endpoint_ << " [awakable=" << awakable << "; port=" |
794 size_t* max_platform_handles) { | 465 << port_.name() << "]"; |
795 if (!serialized_) | 466 |
796 SerializeInternal(); | 467 awakables_.Remove(awakable); |
797 | 468 } |
798 *max_platform_handles = 0; | 469 |
799 if (serialized_platform_handle_.is_valid()) | 470 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes, |
800 (*max_platform_handles)++; | 471 uint32_t* num_ports, |
801 if (!serialized_read_buffer_.empty() || | 472 uint32_t* num_handles) { |
802 !serialized_write_buffer_.empty() || | 473 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); |
803 !serialized_message_queue_.empty()) | 474 *num_ports = 1; |
804 (*max_platform_handles)++; | 475 *num_handles = 0; |
805 *max_platform_handles += serialized_fds_.size(); | 476 } |
806 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); | 477 |
807 } | 478 bool MessagePipeDispatcher::EndSerialize(void* destination, |
808 | 479 ports::PortName* ports, |
809 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | 480 PlatformHandle* handles) { |
810 void* destination, | 481 SerializedState* state = static_cast<SerializedState*>(destination); |
811 size_t* actual_size, | 482 state->pipe_id = pipe_id_; |
812 PlatformHandleVector* platform_handles) { | 483 state->endpoint = static_cast<int8_t>(endpoint_); |
813 CloseImplNoLock(); | 484 ports[0] = port_.name(); |
814 SerializedMessagePipeHandleDispatcher* serialization = | 485 return true; |
815 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); | 486 } |
816 serialization->transferable = transferable_; | 487 |
817 serialization->pipe_id = pipe_id_; | 488 bool MessagePipeDispatcher::BeginTransit() { |
818 if (serialized_platform_handle_.is_valid()) { | 489 base::AutoLock lock(signal_lock_); |
819 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | 490 if (in_transit_) |
820 serialization->platform_handle_index = | 491 return false; |
821 static_cast<uint32_t>(platform_handles->size()); | 492 in_transit_ = true; |
822 platform_handles->push_back(serialized_platform_handle_.release()); | 493 return in_transit_; |
| 494 } |
| 495 |
| 496 void MessagePipeDispatcher::CompleteTransitAndClose() { |
| 497 node_controller_->SetPortObserver(port_, nullptr); |
| 498 |
| 499 base::AutoLock lock(signal_lock_); |
| 500 in_transit_ = false; |
| 501 port_transferred_ = true; |
| 502 CloseNoLock(); |
| 503 } |
| 504 |
| 505 void MessagePipeDispatcher::CancelTransit() { |
| 506 base::AutoLock lock(signal_lock_); |
| 507 in_transit_ = false; |
| 508 |
| 509 // Something may have happened while we were waiting for potential transit. |
| 510 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 511 } |
| 512 |
| 513 // static |
| 514 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
| 515 const void* data, |
| 516 size_t num_bytes, |
| 517 const ports::PortName* ports, |
| 518 size_t num_ports, |
| 519 PlatformHandle* handles, |
| 520 size_t num_handles) { |
| 521 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState)) |
| 522 return nullptr; |
| 523 |
| 524 const SerializedState* state = static_cast<const SerializedState*>(data); |
| 525 |
| 526 ports::PortRef port; |
| 527 CHECK_EQ( |
| 528 ports::OK, |
| 529 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port)); |
| 530 |
| 531 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, |
| 532 state->pipe_id, state->endpoint); |
| 533 } |
| 534 |
| 535 MessagePipeDispatcher::~MessagePipeDispatcher() { |
| 536 DCHECK(port_closed_ && !in_transit_); |
| 537 } |
| 538 |
| 539 MojoResult MessagePipeDispatcher::CloseNoLock() { |
| 540 signal_lock_.AssertAcquired(); |
| 541 if (port_closed_ || in_transit_) |
| 542 return MOJO_RESULT_INVALID_ARGUMENT; |
| 543 |
| 544 port_closed_ = true; |
| 545 awakables_.CancelAll(); |
| 546 |
| 547 if (!port_transferred_) { |
| 548 base::AutoUnlock unlock(signal_lock_); |
| 549 node_controller_->ClosePort(port_); |
| 550 } |
| 551 |
| 552 return MOJO_RESULT_OK; |
| 553 } |
| 554 |
| 555 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { |
| 556 HandleSignalsState rv; |
| 557 |
| 558 ports::PortStatus port_status; |
| 559 if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) { |
| 560 CHECK(in_transit_ || port_transferred_ || port_closed_); |
| 561 return HandleSignalsState(); |
| 562 } |
| 563 |
| 564 if (port_status.has_messages) { |
| 565 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 566 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 567 } |
| 568 if (port_status.receiving_messages) |
| 569 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 570 if (!port_status.peer_closed) { |
| 571 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 572 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 573 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
823 } else { | 574 } else { |
824 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; | 575 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
825 } | 576 } |
826 | 577 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
827 serialization->write_error = write_error_; | 578 return rv; |
828 DCHECK(serialized_read_buffer_.size() < std::numeric_limits<uint32_t>::max()); | 579 } |
829 serialization->serialized_read_buffer_size = | 580 |
830 static_cast<uint32_t>(serialized_read_buffer_.size()); | 581 void MessagePipeDispatcher::OnPortStatusChanged() { |
831 DCHECK(serialized_write_buffer_.size() < | 582 base::AutoLock lock(signal_lock_); |
832 std::numeric_limits<uint32_t>::max()); | 583 |
833 serialization->serialized_write_buffer_size = | 584 // We stop observing our port as soon as it's transferred, but this can race |
834 static_cast<uint32_t>(serialized_write_buffer_.size()); | 585 // with events which are raised right before that happens. This is fine to |
835 DCHECK(serialized_message_queue_.size() < | 586 // ignore. |
836 std::numeric_limits<uint32_t>::max()); | 587 if (port_transferred_) |
837 serialization->serialized_message_queue_size = | 588 return; |
838 static_cast<uint32_t>(serialized_message_queue_.size()); | 589 |
839 | 590 #if !defined(NDEBUG) |
840 serialization->shared_memory_size = static_cast<uint32_t>( | 591 ports::PortStatus port_status; |
841 serialization->serialized_read_buffer_size + | 592 node_controller_->node()->GetStatus(port_, &port_status); |
842 serialization->serialized_write_buffer_size + | 593 if (port_status.has_messages) { |
843 serialization->serialized_message_queue_size); | 594 ports::ScopedMessage unused; |
844 if (serialization->shared_memory_size) { | 595 size_t message_size = 0; |
845 scoped_refptr<PlatformSharedBuffer> shared_buffer( | 596 node_controller_->node()->GetMessageIf( |
846 internal::g_platform_support->CreateSharedBuffer( | 597 port_, [&message_size](const ports::Message& message) { |
847 serialization->shared_memory_size)); | 598 message_size = message.num_payload_bytes(); |
848 scoped_ptr<PlatformSharedBufferMapping> mapping( | 599 return false; |
849 shared_buffer->Map(0, serialization->shared_memory_size)); | 600 }, &unused); |
850 char* start = static_cast<char*>(mapping->GetBase()); | 601 DVLOG(1) << "New message detected on message pipe " << pipe_id_ |
851 start = SerializeBuffer(start, &serialized_read_buffer_); | 602 << " endpoint " << endpoint_ << " [port=" << port_.name() |
852 start = SerializeBuffer(start, &serialized_write_buffer_); | 603 << "; size=" << message_size << "]"; |
853 start = SerializeBuffer(start, &serialized_message_queue_); | 604 } |
854 | 605 if (port_status.peer_closed) { |
855 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | 606 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ |
856 serialization->shared_memory_handle_index = | 607 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
857 static_cast<uint32_t>(platform_handles->size()); | 608 } |
858 platform_handles->push_back(shared_buffer->PassPlatformHandle().release()); | |
859 } else { | |
860 serialization->shared_memory_handle_index = kInvalidMessagePipeHandleIndex; | |
861 } | |
862 | |
863 DCHECK(serialized_read_fds_length_ < std::numeric_limits<uint32_t>::max()); | |
864 serialization->serialized_read_fds_length = | |
865 static_cast<uint32_t>(serialized_read_fds_length_); | |
866 DCHECK(serialized_write_fds_length_ < std::numeric_limits<uint32_t>::max()); | |
867 serialization->serialized_write_fds_length = | |
868 static_cast<uint32_t>(serialized_write_fds_length_); | |
869 DCHECK(serialized_message_fds_length_ < std::numeric_limits<uint32_t>::max()); | |
870 serialization->serialized_message_fds_length = | |
871 static_cast<uint32_t>(serialized_message_fds_length_); | |
872 if (serialized_fds_.empty()) { | |
873 serialization->serialized_fds_index = kInvalidMessagePipeHandleIndex; | |
874 } else { | |
875 #if defined(OS_POSIX) | |
876 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | |
877 serialization->serialized_fds_index = | |
878 static_cast<uint32_t>(platform_handles->size()); | |
879 for (size_t i = 0; i < serialized_fds_.size(); ++i) | |
880 platform_handles->push_back(PlatformHandle(serialized_fds_[i])); | |
881 serialized_fds_.clear(); | |
882 #endif | 609 #endif |
883 } | 610 |
884 | 611 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
885 *actual_size = sizeof(SerializedMessagePipeHandleDispatcher); | |
886 return true; | |
887 } | |
888 | |
889 void MessagePipeDispatcher::TransportStarted() { | |
890 started_transport_.Acquire(); | |
891 } | |
892 | |
893 void MessagePipeDispatcher::TransportEnded() { | |
894 started_transport_.Release(); | |
895 | |
896 base::AutoLock locker(lock()); | |
897 | |
898 // If transporting of MPD failed, we might have got more data and didn't | |
899 // awake for. | |
900 // TODO(jam): should we care about only alerting if it was empty before | |
901 // TransportStarted? | |
902 if (!message_queue_.IsEmpty()) | |
903 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
904 } | |
905 | |
906 void MessagePipeDispatcher::OnReadMessage( | |
907 const MessageInTransit::View& message_view, | |
908 ScopedPlatformHandleVectorPtr platform_handles) { | |
909 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | |
910 if (message_view.transport_data_buffer_size() > 0) { | |
911 DCHECK(message_view.transport_data_buffer()); | |
912 message->SetDispatchers(TransportData::DeserializeDispatchers( | |
913 message_view.transport_data_buffer(), | |
914 message_view.transport_data_buffer_size(), | |
915 std::move(platform_handles))); | |
916 } | |
917 | |
918 bool call_release = false; | |
919 if (started_transport_.Try()) { | |
920 // We're not in the middle of being sent. | |
921 | |
922 // Can get synchronously called back in Init if there was initial data. | |
923 scoped_ptr<base::AutoLock> locker; | |
924 if (!calling_init_) | |
925 locker.reset(new base::AutoLock(lock())); | |
926 | |
927 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { | |
928 if (transferable_) { | |
929 channel_->Shutdown(); | |
930 } else { | |
931 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
932 non_transferable_state_ = CLOSED; | |
933 } | |
934 channel_ = nullptr; | |
935 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
936 if (close_requested_) { | |
937 // We requested the other side to close the connection while they also | |
938 // did the same. We must balance out the AddRef in CloseOnIO to ensure | |
939 // this object isn't leaked. | |
940 call_release = true; | |
941 } | |
942 } else { | |
943 bool was_empty = message_queue_.IsEmpty(); | |
944 message_queue_.AddMessage(std::move(message)); | |
945 if (was_empty) | |
946 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
947 } | |
948 | |
949 started_transport_.Release(); | |
950 } else { | |
951 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { | |
952 // We got a request to shutdown the channel but this object is already | |
953 // calling into channel to serialize it. Since all the other side cares | |
954 // about is flushing pending messages, we bounce the quit back to it. | |
955 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
956 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); | |
957 channel_->WriteMessage(std::move(message)); | |
958 } else { | |
959 // If RawChannel is calling OnRead, that means it has its read_lock_ | |
960 // acquired. That means StartSerialize can't be accessing message queue as | |
961 // it waits on ReleaseHandle first which acquires read_lock_. | |
962 message_queue_.AddMessage(std::move(message)); | |
963 } | |
964 } | |
965 | |
966 if (call_release) | |
967 Release(); | |
968 } | |
969 | |
970 void MessagePipeDispatcher::OnError(Error error) { | |
971 // If there's a read error, then the other side of the pipe is closed. By | |
972 // definition, we can't write since there's no one to read it. And we can't | |
973 // read anymore, since we just got a read erorr. So we close the pipe. | |
974 // If there's a write error, then we stop writing. But we keep the pipe open | |
975 // until we finish reading everything in it. This is because it's valid for | |
976 // one endpoint to write some data and close their pipe immediately. Even | |
977 // though the other end can't write anymore, it should still get all the data. | |
978 switch (error) { | |
979 case ERROR_READ_SHUTDOWN: | |
980 // The other side was cleanly closed, so this isn't actually an error. | |
981 DVLOG(1) << "MessagePipeDispatcher read error (shutdown)"; | |
982 break; | |
983 case ERROR_READ_BROKEN: | |
984 DVLOG(1) << "MessagePipeDispatcher read error (connection broken)"; | |
985 break; | |
986 case ERROR_READ_BAD_MESSAGE: | |
987 // Receiving a bad message means either a bug, data corruption, or | |
988 // malicious attack (probably due to some other bug). | |
989 LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)"; | |
990 break; | |
991 case ERROR_READ_UNKNOWN: | |
992 LOG(ERROR) << "MessagePipeDispatcher read error (unknown)"; | |
993 break; | |
994 case ERROR_WRITE: | |
995 // Write errors are slightly notable: they probably shouldn't happen under | |
996 // normal operation (but maybe the other side crashed). | |
997 LOG(WARNING) << "MessagePipeDispatcher write error"; | |
998 DCHECK_EQ(write_error_, false) << "Should only get one write error."; | |
999 write_error_ = true; | |
1000 break; | |
1001 } | |
1002 | |
1003 bool call_release = false; | |
1004 if (started_transport_.Try()) { | |
1005 base::AutoLock locker(lock()); | |
1006 if (channel_ && error != ERROR_WRITE) { | |
1007 if (transferable_) { | |
1008 channel_->Shutdown(); | |
1009 } else { | |
1010 CHECK_NE(non_transferable_state_, CLOSED); | |
1011 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
1012 non_transferable_state_ = CLOSED; | |
1013 } | |
1014 channel_ = nullptr; | |
1015 if (close_requested_) { | |
1016 // Balance AddRef in CloseOnIO. | |
1017 call_release = true; | |
1018 } | |
1019 } else if (!channel_ && !transferable_ && | |
1020 non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { | |
1021 // Balance AddRef in CloseOnIO. | |
1022 call_release = true; | |
1023 } | |
1024 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
1025 started_transport_.Release(); | |
1026 } else { | |
1027 // We must be waiting to call ReleaseHandle. It will call Shutdown. | |
1028 } | |
1029 | |
1030 if (call_release) | |
1031 Release(); | |
1032 } | |
1033 | |
1034 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | |
1035 MessageInTransit* message, | |
1036 std::vector<DispatcherTransport>* transports) { | |
1037 DCHECK(!message->has_dispatchers()); | |
1038 | |
1039 // You're not allowed to send either handle to a message pipe over the message | |
1040 // pipe, so check for this. (The case of trying to write a handle to itself is | |
1041 // taken care of by |Core|. That case kind of makes sense, but leads to | |
1042 // complications if, e.g., both sides try to do the same thing with their | |
1043 // respective handles simultaneously. The other case, of trying to write the | |
1044 // peer handle to a handle, doesn't make sense -- since no handle will be | |
1045 // available to read the message from.) | |
1046 for (size_t i = 0; i < transports->size(); i++) { | |
1047 if (!(*transports)[i].is_valid()) | |
1048 continue; | |
1049 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { | |
1050 MessagePipeDispatcher* mp = | |
1051 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); | |
1052 if (transferable_ && mp->transferable_ && | |
1053 channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { | |
1054 // The other case should have been disallowed by |Core|. (Note: |port| | |
1055 // is the peer port of the handle given to |WriteMessage()|.) | |
1056 return MOJO_RESULT_INVALID_ARGUMENT; | |
1057 } else if (!transferable_ && !mp->transferable_ && | |
1058 pipe_id_ == mp->pipe_id_) { | |
1059 return MOJO_RESULT_INVALID_ARGUMENT; | |
1060 } | |
1061 } | |
1062 } | |
1063 | |
1064 // Clone the dispatchers and attach them to the message. (This must be done as | |
1065 // a separate loop, since we want to leave the dispatchers alone on failure.) | |
1066 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | |
1067 dispatchers->reserve(transports->size()); | |
1068 for (size_t i = 0; i < transports->size(); i++) { | |
1069 if ((*transports)[i].is_valid()) { | |
1070 dispatchers->push_back( | |
1071 (*transports)[i].CreateEquivalentDispatcherAndClose()); | |
1072 } else { | |
1073 LOG(WARNING) << "Enqueueing null dispatcher"; | |
1074 dispatchers->push_back(nullptr); | |
1075 } | |
1076 } | |
1077 message->SetDispatchers(std::move(dispatchers)); | |
1078 return MOJO_RESULT_OK; | |
1079 } | |
1080 | |
1081 void MessagePipeDispatcher::RequestNontransferableChannel() { | |
1082 lock().AssertAcquired(); | |
1083 CHECK(!transferable_); | |
1084 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); | |
1085 non_transferable_state_ = CONNECT_CALLED; | |
1086 #if !defined(OFFICIAL_BUILD) | |
1087 non_transferable_bound_stack_.reset(new base::debug::StackTrace); | |
1088 #endif | |
1089 | |
1090 // PostTask since the broker can call us back synchronously. | |
1091 internal::g_io_thread_task_runner->PostTask( | |
1092 FROM_HERE, | |
1093 base::Bind(&Broker::ConnectMessagePipe, | |
1094 base::Unretained(internal::g_broker), pipe_id_, | |
1095 base::Unretained(this))); | |
1096 } | 612 } |
1097 | 613 |
1098 } // namespace edk | 614 } // namespace edk |
1099 } // namespace mojo | 615 } // namespace mojo |
OLD | NEW |