Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <limits> 7 #include <limits>
11 #include <utility> 8
12 #include <vector> 9 #include "base/macros.h"
13 10 #include "base/memory/ref_counted.h"
14 #include "base/bind.h" 11 #include "base/memory/scoped_ptr.h"
15 #include "base/debug/stack_trace.h"
16 #include "base/logging.h"
17 #include "base/message_loop/message_loop.h"
18 #include "mojo/edk/embedder/embedder_internal.h" 12 #include "mojo/edk/embedder/embedder_internal.h"
19 #include "mojo/edk/embedder/platform_handle_utils.h" 13 #include "mojo/edk/system/core.h"
20 #include "mojo/edk/embedder/platform_shared_buffer.h" 14 #include "mojo/edk/system/node_controller.h"
21 #include "mojo/edk/embedder/platform_support.h" 15 #include "mojo/edk/system/ports_message.h"
22 #include "mojo/edk/system/broker.h" 16 #include "mojo/public/c/system/macros.h"
23 #include "mojo/edk/system/configuration.h"
24 #include "mojo/edk/system/message_in_transit.h"
25 #include "mojo/edk/system/options_validation.h"
26 #include "mojo/edk/system/transport_data.h"
27 17
28 namespace mojo { 18 namespace mojo {
29 namespace edk { 19 namespace edk {
30 20
31 namespace { 21 namespace {
32 22
33 const uint32_t kInvalidMessagePipeHandleIndex = static_cast<uint32_t>(-1); 23 // Header attached to every message sent over a message pipe.
34 24 struct MOJO_ALIGNAS(8) MessageHeader {
35 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { 25 // The number of serialized dispatchers included in this header.
36 MOJO_ALIGNAS(1) bool transferable; 26 uint32_t num_dispatchers;
37 MOJO_ALIGNAS(1) bool write_error; 27
38 MOJO_ALIGNAS(8) uint64_t pipe_id; // If transferable is false. 28 // Total size of the header, including serialized dispatcher data.
39 // The following members are only set if transferable is true. 29 uint32_t header_size;
40 // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
41 // was closed.
42 MOJO_ALIGNAS(4) uint32_t platform_handle_index;
43
44 MOJO_ALIGNAS(4) uint32_t
45 shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
46 MOJO_ALIGNAS(4) uint32_t shared_memory_size;
47
48 MOJO_ALIGNAS(4) uint32_t serialized_read_buffer_size;
49 MOJO_ALIGNAS(4) uint32_t serialized_write_buffer_size;
50 MOJO_ALIGNAS(4) uint32_t serialized_message_queue_size;
51
52 // These are the FDs required as part of serializing channel_ and
53 // message_queue_. This is only used on POSIX.
54 MOJO_ALIGNAS(4)
55 uint32_t serialized_fds_index; // (Or |kInvalidMessagePipeHandleIndex|.)
56 MOJO_ALIGNAS(4) uint32_t serialized_read_fds_length;
57 MOJO_ALIGNAS(4) uint32_t serialized_write_fds_length;
58 MOJO_ALIGNAS(4) uint32_t serialized_message_fds_length;
59 }; 30 };
60 31
61 char* SerializeBuffer(char* start, std::vector<char>* buffer) { 32 // Header for each dispatcher, immediately following the message header.
62 if (buffer->size()) 33 struct MOJO_ALIGNAS(8) DispatcherHeader {
63 memcpy(start, &(*buffer)[0], buffer->size()); 34 // The type of the dispatcher, correpsonding to the Dispatcher::Type enum.
64 return start + buffer->size(); 35 int32_t type;
65 } 36
66 37 // The size of the serialized dispatcher, not including this header.
67 bool GetHandle(size_t index, 38 uint32_t num_bytes;
68 PlatformHandleVector* platform_handles, 39
69 ScopedPlatformHandle* handle) { 40 // The number of ports needed to deserialize this dispatcher.
70 if (index == kInvalidMessagePipeHandleIndex) 41 uint32_t num_ports;
71 return true; 42
72 43 // The number of platform handles needed to deserialize this dispatcher.
73 if (!platform_handles || index >= platform_handles->size()) { 44 uint32_t num_platform_handles;
74 LOG(ERROR) 45 };
75 << "Invalid serialized message pipe dispatcher (missing handles)"; 46
76 return false; 47 struct MOJO_ALIGNAS(8) SerializedState {
77 } 48 uint64_t pipe_id;
78 49 int8_t endpoint;
79 // We take ownership of the handle, so we have to invalidate the one in 50 };
80 // |platform_handles|.
81 handle->reset((*platform_handles)[index]);
82 (*platform_handles)[index] = PlatformHandle();
83 return true;
84 }
85
86 #if defined(OS_POSIX)
87 void ClosePlatformHandles(std::vector<int>* fds) {
88 for (size_t i = 0; i < fds->size(); ++i)
89 PlatformHandle((*fds)[i]).CloseIfNecessary();
90 }
91 #endif
92 51
93 } // namespace 52 } // namespace
94 53
95 // MessagePipeDispatcher ------------------------------------------------------- 54 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a
96 55 // reference to the MPD to ensure it lives as long as the observed port.
97 const MojoCreateMessagePipeOptions 56 class MessagePipeDispatcher::PortObserverThunk
98 MessagePipeDispatcher::kDefaultCreateOptions = { 57 : public NodeController::PortObserver {
99 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), 58 public:
100 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; 59 explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
101 60 : dispatcher_(dispatcher) {}
102 MojoResult MessagePipeDispatcher::ValidateCreateOptions( 61
103 const MojoCreateMessagePipeOptions* in_options, 62 private:
104 MojoCreateMessagePipeOptions* out_options) { 63 ~PortObserverThunk() override {}
105 const MojoCreateMessagePipeOptionsFlags kKnownFlags = 64
106 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | 65 // NodeController::PortObserver:
107 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; 66 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
108 67
109 *out_options = kDefaultCreateOptions; 68 scoped_refptr<MessagePipeDispatcher> dispatcher_;
110 if (!in_options) 69
111 return MOJO_RESULT_OK; 70 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
112 71 };
113 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); 72
114 if (!reader.is_valid()) 73 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
115 return MOJO_RESULT_INVALID_ARGUMENT; 74 const ports::PortRef& port,
116 75 uint64_t pipe_id,
117 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) 76 int endpoint)
118 return MOJO_RESULT_OK; 77 : node_controller_(node_controller),
119 if ((reader.options().flags & ~kKnownFlags)) 78 port_(port),
120 return MOJO_RESULT_UNIMPLEMENTED; 79 pipe_id_(pipe_id),
121 out_options->flags = reader.options().flags; 80 endpoint_(endpoint) {
122 81 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
123 // Checks for fields beyond |flags|: 82 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
124 83
125 // (Nothing here yet.) 84 node_controller_->SetPortObserver(
126 85 port_,
127 return MOJO_RESULT_OK; 86 make_scoped_refptr(new PortObserverThunk(this)));
128 }
129
130 void MessagePipeDispatcher::Init(
131 ScopedPlatformHandle message_pipe,
132 char* serialized_read_buffer, size_t serialized_read_buffer_size,
133 char* serialized_write_buffer, size_t serialized_write_buffer_size,
134 std::vector<int>* serialized_read_fds,
135 std::vector<int>* serialized_write_fds) {
136 CHECK(transferable_);
137 if (message_pipe.get().is_valid()) {
138 channel_ = RawChannel::Create(std::move(message_pipe));
139
140 // TODO(jam): It's probably cleaner to pass this in Init call.
141 channel_->SetSerializedData(
142 serialized_read_buffer, serialized_read_buffer_size,
143 serialized_write_buffer, serialized_write_buffer_size,
144 serialized_read_fds, serialized_write_fds);
145 internal::g_io_thread_task_runner->PostTask(
146 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
147 }
148 }
149
150 void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
151 CHECK(!transferable_);
152 pipe_id_ = pipe_id;
153 }
154
155 void MessagePipeDispatcher::InitOnIO() {
156 base::AutoLock locker(lock());
157 CHECK(transferable_);
158 calling_init_ = true;
159 if (channel_)
160 channel_->Init(this);
161 calling_init_ = false;
162 }
163
164 void MessagePipeDispatcher::CloseOnIOAndRelease() {
165 {
166 base::AutoLock locker(lock());
167 CloseOnIO();
168 }
169 Release(); // To match CloseImplNoLock.
170 }
171
172 void MessagePipeDispatcher::CloseOnIO() {
173 lock().AssertAcquired();
174
175 if (channel_) {
176 // If we closed the channel now then in-flight message pipes wouldn't get
177 // closed, and their other side wouldn't get a connection error notification
178 // which could lead to hangs or leaks. So we ask the other side of this
179 // message pipe to close, which ensures that we have dispatched all
180 // in-flight message pipes.
181 DCHECK(!close_requested_);
182 close_requested_ = true;
183 AddRef();
184 scoped_ptr<MessageInTransit> message(new MessageInTransit(
185 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
186 if (!transferable_)
187 message->set_route_id(pipe_id_);
188 channel_->WriteMessage(std::move(message));
189 return;
190 }
191
192 if (!transferable_ &&
193 (non_transferable_state_ == CONNECT_CALLED ||
194 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) {
195 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
196 RequestNontransferableChannel();
197
198 // We can't cancel the pending request yet, since the other side of the
199 // message pipe would want to get pending outgoing messages (if any) or at
200 // least know that this end was closed. So keep this object alive until
201 // then.
202 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
203 AddRef();
204 }
205 } 87 }
206 88
207 Dispatcher::Type MessagePipeDispatcher::GetType() const { 89 Dispatcher::Type MessagePipeDispatcher::GetType() const {
208 return Type::MESSAGE_PIPE; 90 return Type::MESSAGE_PIPE;
209 } 91 }
210 92
211 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { 93 MojoResult MessagePipeDispatcher::Close() {
212 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); 94 base::AutoLock lock(signal_lock_);
213 base::AutoLock locker(lock()); 95 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
214 channel_ = channel; 96 << " [port=" << port_.name() << "]";
215 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { 97 return CloseNoLock();
216 channel_->WriteMessage( 98 }
217 non_transferable_outgoing_message_queue_.GetMessage()); 99
218 } 100 MojoResult MessagePipeDispatcher::WriteMessage(
219
220 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
221 non_transferable_state_ = CONNECTED;
222 // We kept this object alive until it's connected, we can close it now.
223 CloseOnIO();
224 // Balance the AddRef in CloseOnIO.
225 Release();
226 return;
227 }
228
229 non_transferable_state_ = CONNECTED;
230 }
231
232 #if defined(OS_WIN)
233 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
234 // best way we want to share this.
235 // Since this is used for serialization of messages read/written to a MP that
236 // aren't consumed by Mojo primitives yet, there could be an unbounded number of
237 // them when a MP is being sent. As a result, even for POSIX we will probably
238 // want to send the handles to the shell process and exchange them for tokens
239 // (since we can be sure that the shell will respond to our IPCs, compared to
240 // the other end where we're sending the MP to, which may not be reading...).
241 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
242 size_t num_platform_handles,
243 const void* platform_handle_table) {
244 ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector());
245 rv->resize(num_platform_handles);
246
247 const uint64_t* tokens =
248 static_cast<const uint64_t*>(platform_handle_table);
249 internal::g_broker->TokenToHandle(tokens, num_platform_handles, &rv->at(0));
250 return rv.Pass();
251 }
252 #endif
253
254 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
255 const void* source,
256 size_t size,
257 PlatformHandleVector* platform_handles) {
258 if (size != sizeof(SerializedMessagePipeHandleDispatcher)) {
259 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)";
260 return nullptr;
261 }
262
263 const SerializedMessagePipeHandleDispatcher* serialization =
264 static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
265
266 scoped_refptr<MessagePipeDispatcher> rv(
267 new MessagePipeDispatcher(serialization->transferable));
268 if (!rv->transferable_) {
269 rv->InitNonTransferable(serialization->pipe_id);
270 return rv;
271 }
272
273 if (serialization->shared_memory_size !=
274 (serialization->serialized_read_buffer_size +
275 serialization->serialized_write_buffer_size +
276 serialization->serialized_message_queue_size)) {
277 LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)";
278 return nullptr;
279 }
280
281 ScopedPlatformHandle platform_handle, shared_memory_handle;
282 if (!GetHandle(serialization->platform_handle_index,
283 platform_handles, &platform_handle) ||
284 !GetHandle(serialization->shared_memory_handle_index,
285 platform_handles, &shared_memory_handle)) {
286 return nullptr;
287 }
288
289 char* serialized_read_buffer = nullptr;
290 size_t serialized_read_buffer_size = 0;
291 char* serialized_write_buffer = nullptr;
292 size_t serialized_write_buffer_size = 0;
293 char* message_queue_data = nullptr;
294 size_t message_queue_size = 0;
295 scoped_refptr<PlatformSharedBuffer> shared_buffer;
296 scoped_ptr<PlatformSharedBufferMapping> mapping;
297 if (shared_memory_handle.is_valid()) {
298 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
299 serialization->shared_memory_size, std::move(shared_memory_handle));
300 mapping = shared_buffer->Map(0, serialization->shared_memory_size);
301 char* buffer = static_cast<char*>(mapping->GetBase());
302 if (serialization->serialized_read_buffer_size) {
303 serialized_read_buffer = buffer;
304 serialized_read_buffer_size = serialization->serialized_read_buffer_size;
305 buffer += serialized_read_buffer_size;
306 }
307 if (serialization->serialized_write_buffer_size) {
308 serialized_write_buffer = buffer;
309 serialized_write_buffer_size =
310 serialization->serialized_write_buffer_size;
311 buffer += serialized_write_buffer_size;
312 }
313 if (serialization->serialized_message_queue_size) {
314 message_queue_data = buffer;
315 message_queue_size = serialization->serialized_message_queue_size;
316 buffer += message_queue_size;
317 }
318 }
319
320 rv->write_error_ = serialization->write_error;
321
322 std::vector<int> serialized_read_fds;
323 std::vector<int> serialized_write_fds;
324 #if defined(OS_POSIX)
325 std::vector<int> serialized_fds;
326 size_t serialized_fds_index = 0;
327
328 size_t total_fd_count = serialization->serialized_read_fds_length +
329 serialization->serialized_write_fds_length +
330 serialization->serialized_message_fds_length;
331 for (size_t i = 0; i < total_fd_count; ++i) {
332 ScopedPlatformHandle handle;
333 if (!GetHandle(serialization->serialized_fds_index + i, platform_handles,
334 &handle)) {
335 ClosePlatformHandles(&serialized_fds);
336 return nullptr;
337 }
338 serialized_fds.push_back(handle.release().handle);
339 }
340
341 serialized_read_fds.assign(
342 serialized_fds.begin(),
343 serialized_fds.begin() + serialization->serialized_read_fds_length);
344 serialized_fds_index += serialization->serialized_read_fds_length;
345 serialized_write_fds.assign(
346 serialized_fds.begin() + serialized_fds_index,
347 serialized_fds.begin() + serialized_fds_index +
348 serialization->serialized_write_fds_length);
349 serialized_fds_index += serialization->serialized_write_fds_length;
350 #endif
351
352 while (message_queue_size) {
353 size_t message_size;
354 if (!MessageInTransit::GetNextMessageSize(
355 message_queue_data, message_queue_size, &message_size)) {
356 NOTREACHED() << "Couldn't read message size from serialized data.";
357 return nullptr;
358 }
359 if (message_size > message_queue_size) {
360 NOTREACHED() << "Invalid serialized message size.";
361 return nullptr;
362 }
363 MessageInTransit::View message_view(message_size, message_queue_data);
364 message_queue_size -= message_size;
365 message_queue_data += message_size;
366
367 // TODO(jam): Copied below from RawChannelWin. See commment above
368 // GetReadPlatformHandles.
369 ScopedPlatformHandleVectorPtr temp_platform_handles;
370 if (message_view.transport_data_buffer()) {
371 size_t num_platform_handles;
372 const void* platform_handle_table;
373 TransportData::GetPlatformHandleTable(
374 message_view.transport_data_buffer(), &num_platform_handles,
375 &platform_handle_table);
376
377 if (num_platform_handles > 0) {
378 #if defined(OS_WIN)
379 temp_platform_handles =
380 GetReadPlatformHandles(num_platform_handles,
381 platform_handle_table).Pass();
382 if (!temp_platform_handles) {
383 LOG(ERROR) << "Invalid number of platform handles received";
384 return nullptr;
385 }
386 #else
387 temp_platform_handles.reset(new PlatformHandleVector());
388 for (size_t i = 0; i < num_platform_handles; ++i)
389 temp_platform_handles->push_back(
390 PlatformHandle(serialized_fds[serialized_fds_index++]));
391 #endif
392 }
393 }
394
395 // TODO(jam): Copied below from RawChannelWin. See commment above
396 // GetReadPlatformHandles.
397 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
398 if (message_view.transport_data_buffer_size() > 0) {
399 DCHECK(message_view.transport_data_buffer());
400 message->SetDispatchers(TransportData::DeserializeDispatchers(
401 message_view.transport_data_buffer(),
402 message_view.transport_data_buffer_size(),
403 std::move(temp_platform_handles)));
404 }
405
406 rv->message_queue_.AddMessage(std::move(message));
407 }
408
409 rv->Init(std::move(platform_handle), serialized_read_buffer,
410 serialized_read_buffer_size, serialized_write_buffer,
411 serialized_write_buffer_size, &serialized_read_fds,
412 &serialized_write_fds);
413
414 if (message_queue_size) { // Should be empty by now.
415 LOG(ERROR) << "Invalid queued messages";
416 return nullptr;
417 }
418
419 return rv;
420 }
421
422 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
423 : channel_(nullptr),
424 serialized_read_fds_length_(0u),
425 serialized_write_fds_length_(0u),
426 serialized_message_fds_length_(0u),
427 pipe_id_(0),
428 non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
429 serialized_(false),
430 calling_init_(false),
431 write_error_(false),
432 transferable_(transferable),
433 close_requested_(false) {
434 }
435
436 MessagePipeDispatcher::~MessagePipeDispatcher() {
437 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
438 // exception is if they posted a task to run CloseOnIO but the IO thread shut
439 // down and so when it was deleting pending tasks it caused the last reference
440 // to destruct this object. In that case, safe to destroy the channel.
441 if (channel_ &&
442 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) {
443 if (transferable_) {
444 channel_->Shutdown();
445 } else {
446 internal::g_broker->CloseMessagePipe(pipe_id_, this);
447 }
448 } else {
449 DCHECK(!channel_);
450 }
451 #if defined(OS_POSIX)
452 ClosePlatformHandles(&serialized_fds_);
453 #endif
454 }
455
456 void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
457 lock().AssertAcquired();
458 awakable_list_.CancelAll();
459 }
460
461 void MessagePipeDispatcher::CloseImplNoLock() {
462 lock().AssertAcquired();
463 // This early circuit fixes leak in unit tests. There's nothing to do in the
464 // posted task.
465 if (!transferable_ && non_transferable_state_ == CLOSED)
466 return;
467
468 // We take a manual refcount because at shutdown, the task below might not get
469 // a chance to execute. If that happens, the RawChannel will still call our
470 // OnError method because it always runs (since it watches thread
471 // destruction). So to avoid UAF, manually add a reference and only release it
472 // if the task runs.
473 AddRef();
474 if (!internal::g_io_thread_task_runner->PostTask(
475 FROM_HERE,
476 base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this))) {
477 // Avoid a shutdown leak in unittests. If the thread is shutting down,
478 // we can't connect to the other end to let it know that we're closed either
479 // way.
480 if (!transferable_ && non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
481 Release();
482 }
483 }
484
485 void MessagePipeDispatcher::SerializeInternal() {
486 serialized_ = true;
487 if (!transferable_) {
488 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
489 << "Non transferable message pipe being sent after read/write/waited. "
490 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
491 << "the pipe can be sent after it's read or written. This message pipe "
492 << "was previously bound at:\n"
493 << non_transferable_bound_stack_->ToString();
494
495 non_transferable_state_ = SERIALISED;
496 return;
497 }
498
499 // We need to stop watching handle immediately, even though not on IO thread,
500 // so that other messages aren't read after this.
501 std::vector<int> serialized_read_fds, serialized_write_fds;
502 if (channel_) {
503 bool write_error = false;
504
505 serialized_platform_handle_ = channel_->ReleaseHandle(
506 &serialized_read_buffer_, &serialized_write_buffer_,
507 &serialized_read_fds, &serialized_write_fds, &write_error);
508 serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(),
509 serialized_read_fds.end());
510 serialized_read_fds_length_ = serialized_read_fds.size();
511 serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(),
512 serialized_write_fds.end());
513 serialized_write_fds_length_ = serialized_write_fds.size();
514 channel_ = nullptr;
515 } else {
516 // It's valid that the other side wrote some data and closed its end.
517 }
518
519 DCHECK(serialized_message_queue_.empty());
520 while (!message_queue_.IsEmpty()) {
521 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
522
523 // When MojoWriteMessage is called, the MessageInTransit doesn't have
524 // dispatchers set and CreateEquivaent... is called since the dispatchers
525 // can be referenced by others. here dispatchers aren't referenced by
526 // others, but rawchannel can still call to them. so since we dont call
527 // createequiv, manually call TransportStarted and TransportEnd.
528 DispatcherVector dispatchers;
529 if (message->has_dispatchers())
530 dispatchers = *message->dispatchers();
531 for (size_t i = 0; i < dispatchers.size(); ++i)
532 dispatchers[i]->TransportStarted();
533
534 // TODO(jam): this handling for dispatchers only works on windows where we
535 // send transportdata as bytes instead of as parameters to sendmsg.
536 message->SerializeAndCloseDispatchers();
537 // cont'd below
538
539 size_t main_buffer_size = message->main_buffer_size();
540 size_t transport_data_buffer_size = message->transport_data() ?
541 message->transport_data()->buffer_size() : 0;
542
543 serialized_message_queue_.insert(
544 serialized_message_queue_.end(),
545 static_cast<const char*>(message->main_buffer()),
546 static_cast<const char*>(message->main_buffer()) + main_buffer_size);
547
548 // cont'd
549 if (transport_data_buffer_size != 0) {
550 // TODO(jam): copied from RawChannelWin::WriteNoLock(
551 PlatformHandleVector* all_platform_handles =
552 message->transport_data()->platform_handles();
553 if (all_platform_handles) {
554 #if defined(OS_WIN)
555 uint64_t* tokens = reinterpret_cast<uint64_t*>(
556 static_cast<char*>(message->transport_data()->buffer()) +
557 message->transport_data()->platform_handle_table_offset());
558 internal::g_broker->HandleToToken(
559 &all_platform_handles->at(0), all_platform_handles->size(), tokens);
560 for (size_t i = 0; i < all_platform_handles->size(); i++)
561 all_platform_handles->at(i) = PlatformHandle();
562 #else
563 for (size_t i = 0; i < all_platform_handles->size(); i++) {
564 serialized_fds_.push_back(all_platform_handles->at(i).handle);
565 serialized_message_fds_length_++;
566 all_platform_handles->at(i) = PlatformHandle();
567 }
568 #endif
569 }
570
571 serialized_message_queue_.insert(
572 serialized_message_queue_.end(),
573 static_cast<const char*>(message->transport_data()->buffer()),
574 static_cast<const char*>(message->transport_data()->buffer()) +
575 transport_data_buffer_size);
576 }
577
578 for (size_t i = 0; i < dispatchers.size(); ++i)
579 dispatchers[i]->TransportEnded();
580 }
581 }
582
583 scoped_refptr<Dispatcher>
584 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
585 lock().AssertAcquired();
586
587 SerializeInternal();
588
589 scoped_refptr<MessagePipeDispatcher> rv(
590 new MessagePipeDispatcher(transferable_));
591 rv->serialized_ = true;
592 if (transferable_) {
593 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
594 serialized_message_queue_.swap(rv->serialized_message_queue_);
595 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
596 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
597 serialized_fds_.swap(rv->serialized_fds_);
598 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
599 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
600 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
601 rv->write_error_ = write_error_;
602 } else {
603 rv->pipe_id_ = pipe_id_;
604 rv->non_transferable_state_ = non_transferable_state_;
605 }
606 return rv;
607 }
608
609 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
610 const void* bytes, 101 const void* bytes,
611 uint32_t num_bytes, 102 uint32_t num_bytes,
612 std::vector<DispatcherTransport>* transports, 103 const DispatcherInTransit* dispatchers,
104 uint32_t num_dispatchers,
613 MojoWriteMessageFlags flags) { 105 MojoWriteMessageFlags flags) {
614 lock().AssertAcquired(); 106
615 107 {
616 DCHECK(!transports || 108 base::AutoLock lock(signal_lock_);
617 (transports->size() > 0 && 109 if (port_closed_ || in_transit_)
618 transports->size() <= GetConfiguration().max_message_num_handles)); 110 return MOJO_RESULT_INVALID_ARGUMENT;
619 111 }
620 if (write_error_ || 112
621 (transferable_ && !channel_) || 113 // A structure for retaining information about every Dispatcher we're about
622 (!transferable_ && non_transferable_state_ == CLOSED)) { 114 // to send. This information is collected by calling StartSerialize() on
115 // each dispatcher in sequence.
116 struct DispatcherInfo {
117 uint32_t num_bytes;
118 uint32_t num_ports;
119 uint32_t num_handles;
120 };
121
122 // This is only the base header size. It will grow as we accumulate the
123 // size of serialized state for each dispatcher.
124 size_t header_size = sizeof(MessageHeader) +
125 num_dispatchers * sizeof(DispatcherHeader);
126
127 size_t num_ports = 0;
128 size_t num_handles = 0;
129
130 std::vector<DispatcherInfo> dispatcher_info(num_dispatchers);
131 for (size_t i = 0; i < num_dispatchers; ++i) {
132 Dispatcher* d = dispatchers[i].dispatcher.get();
133 d->StartSerialize(&dispatcher_info[i].num_bytes,
134 &dispatcher_info[i].num_ports,
135 &dispatcher_info[i].num_handles);
136 header_size += dispatcher_info[i].num_bytes;
137 num_ports += dispatcher_info[i].num_ports;
138 num_handles += dispatcher_info[i].num_handles;
139 }
140
141 // We now have enough information to fully allocate the message storage.
142 scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage(
143 header_size + num_bytes, num_ports, num_handles);
144 DCHECK(message);
145
146 // Populate the message header with information about serialized dispatchers.
147 //
148 // The front of the message is always a MessageHeader followed by a
149 // DispatcherHeader for each dispatcher to be sent.
150 MessageHeader* header =
151 static_cast<MessageHeader*>(message->mutable_payload_bytes());
152 DispatcherHeader* dispatcher_headers =
153 reinterpret_cast<DispatcherHeader*>(header + 1);
154
155 // Serialized dispatcher state immediately follows the series of
156 // DispatcherHeaders.
157 char* dispatcher_data =
158 reinterpret_cast<char*>(dispatcher_headers + num_dispatchers);
159
160 header->num_dispatchers = num_dispatchers;
161
162 // |header_size| is the total number of bytes preceding the message payload,
163 // including all dispatcher headers and serialized dispatcher state.
164 DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max());
165 header->header_size = static_cast<uint32_t>(header_size);
166
167 bool cancel_transit = false;
168 if (num_dispatchers > 0) {
169 ScopedPlatformHandleVectorPtr handles(
170 new PlatformHandleVector(num_handles));
171 size_t port_index = 0;
172 size_t handle_index = 0;
173 for (size_t i = 0; i < num_dispatchers; ++i) {
174 Dispatcher* d = dispatchers[i].dispatcher.get();
175 DispatcherHeader* dh = &dispatcher_headers[i];
176 const DispatcherInfo& info = dispatcher_info[i];
177
178 // Fill in the header for this dispatcher.
179 dh->type = static_cast<int32_t>(d->GetType());
180 dh->num_bytes = info.num_bytes;
181 dh->num_ports = info.num_ports;
182 dh->num_platform_handles = info.num_handles;
183
184 // Fill in serialized state, ports, and platform handles. We'll cancel
185 // the send if the dispatcher implementation rejects for some reason.
186 if (!d->EndSerialize(static_cast<void*>(dispatcher_data),
187 message->mutable_ports() + port_index,
188 handles->data() + handle_index)) {
189 cancel_transit = true;
190 break;
191 }
192
193 dispatcher_data += info.num_bytes;
194 port_index += info.num_ports;
195 handle_index += info.num_handles;
196 }
197
198 if (!cancel_transit) {
199 // Take ownership of all the handles and move them into message storage.
200 message->SetHandles(std::move(handles));
201 } else {
202 // Release any platform handles we've accumulated. Their dispatchers
203 // retain ownership when transit is canceled, so these are not actually
204 // leaking.
205 handles->clear();
206 }
207 }
208
209 MojoResult result = MOJO_RESULT_OK;
210 if (!cancel_transit) {
211 // Copy the message body.
212 void* message_body = static_cast<void*>(
213 static_cast<char*>(message->mutable_payload_bytes()) + header_size);
214 memcpy(message_body, bytes, num_bytes);
215
216 int rv = node_controller_->SendMessage(port_, &message);
217
218 DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
219 << " [port=" << port_.name() << "; rv=" << rv
220 << "; num_bytes=" << num_bytes << "]";
221
222 if (rv != ports::OK) {
223 if (rv == ports::ERROR_PORT_UNKNOWN ||
224 rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
225 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
226 result = MOJO_RESULT_INVALID_ARGUMENT;
227 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
228 base::AutoLock lock(signal_lock_);
229 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
230 result = MOJO_RESULT_FAILED_PRECONDITION;
231 } else {
232 NOTREACHED();
233 result = MOJO_RESULT_UNKNOWN;
234 }
235 cancel_transit = true;
236 } else {
237 DCHECK(!message);
238 }
239 }
240
241 if (cancel_transit) {
242 // We ended up not sending the message. Release all the platform handles.
243 // Their dipatchers retain ownership when transit is canceled, so these are
244 // not actually leaking.
245 DCHECK(message);
246 Channel::MessagePtr m = message->TakeChannelMessage();
247 ScopedPlatformHandleVectorPtr handles = m->TakeHandles();
248 if (handles)
249 handles->clear();
250 }
251
252 return result;
253 }
254
255 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
256 uint32_t* num_bytes,
257 MojoHandle* handles,
258 uint32_t* num_handles,
259 MojoReadMessageFlags flags) {
260 {
261 base::AutoLock lock(signal_lock_);
262 // We can't read from a port that's closed or in transit!
263 if (port_closed_ || in_transit_)
264 return MOJO_RESULT_INVALID_ARGUMENT;
265 }
266
267 bool no_space = false;
268 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
269
270 // Ensure the provided buffers are large enough to hold the next message.
271 // GetMessageIf provides an atomic way to test the next message without
272 // committing to removing it from the port's underlying message queue until
273 // we are sure we can consume it.
274
275 ports::ScopedMessage ports_message;
276 int rv = node_controller_->node()->GetMessageIf(
277 port_,
278 [num_bytes, num_handles, &no_space, &may_discard](
279 const ports::Message& next_message) {
280 const PortsMessage& message =
281 static_cast<const PortsMessage&>(next_message);
282 DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader));
283
284 const MessageHeader* header =
285 static_cast<const MessageHeader*>(message.payload_bytes());
286 DCHECK_LE(header->header_size, message.num_payload_bytes());
287
288 uint32_t bytes_to_read = 0;
289 uint32_t bytes_available =
290 static_cast<uint32_t>(message.num_payload_bytes()) -
291 header->header_size;
292 if (num_bytes) {
293 bytes_to_read = std::min(*num_bytes, bytes_available);
294 *num_bytes = bytes_available;
295 }
296
297 uint32_t handles_to_read = 0;
298 uint32_t handles_available = header->num_dispatchers;
299 if (num_handles) {
300 handles_to_read = std::min(*num_handles, handles_available);
301 *num_handles = handles_available;
302 }
303
304 if (bytes_to_read < bytes_available ||
305 handles_to_read < handles_available) {
306 no_space = true;
307 return may_discard;
308 }
309
310 return true;
311 },
312 &ports_message);
313
314 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
315 if (rv == ports::ERROR_PORT_UNKNOWN ||
316 rv == ports::ERROR_PORT_STATE_UNEXPECTED)
317 return MOJO_RESULT_INVALID_ARGUMENT;
318
319 NOTREACHED();
320 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here?
321 }
322
323 if (no_space) {
324 // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this
325 // message's data. The message will still be in queue unless
326 // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
327 return MOJO_RESULT_RESOURCE_EXHAUSTED;
328 }
329
330 if (!ports_message) {
331 // No message was available in queue.
332
333 if (rv == ports::OK)
334 return MOJO_RESULT_SHOULD_WAIT;
335
336 // Peer is closed and there are no more messages to read.
337 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
338 base::AutoLock lock(signal_lock_);
339 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
623 return MOJO_RESULT_FAILED_PRECONDITION; 340 return MOJO_RESULT_FAILED_PRECONDITION;
624 } 341 }
625 342
626 if (num_bytes > GetConfiguration().max_message_num_bytes) 343 // Alright! We have a message and the caller has provided sufficient storage
627 return MOJO_RESULT_RESOURCE_EXHAUSTED; 344 // in which to receive it.
628 scoped_ptr<MessageInTransit> message(new MessageInTransit( 345
629 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); 346 scoped_ptr<PortsMessage> message(
630 if (transports) { 347 static_cast<PortsMessage*>(ports_message.release()));
631 MojoResult result = AttachTransportsNoLock(message.get(), transports); 348
632 if (result != MOJO_RESULT_OK) 349 const MessageHeader* header =
633 return result; 350 static_cast<const MessageHeader*>(message->payload_bytes());
634 } 351 const DispatcherHeader* dispatcher_headers =
635 352 reinterpret_cast<const DispatcherHeader*>(header + 1);
636 message->SerializeAndCloseDispatchers(); 353
637 if (!transferable_) 354 const char* dispatcher_data = reinterpret_cast<const char*>(
638 message->set_route_id(pipe_id_); 355 dispatcher_headers + header->num_dispatchers);
639 if (!transferable_ && 356
640 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || 357 // Deserialize dispatchers.
641 non_transferable_state_ == CONNECT_CALLED)) { 358 if (header->num_dispatchers > 0) {
642 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) 359 CHECK(handles);
643 RequestNontransferableChannel(); 360 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers);
644 non_transferable_outgoing_message_queue_.AddMessage(std::move(message)); 361 size_t data_payload_index = sizeof(MessageHeader) +
645 } else { 362 header->num_dispatchers * sizeof(DispatcherHeader);
646 channel_->WriteMessage(std::move(message)); 363 size_t port_index = 0;
647 } 364 size_t platform_handle_index = 0;
365 for (size_t i = 0; i < header->num_dispatchers; ++i) {
366 const DispatcherHeader& dh = dispatcher_headers[i];
367 Type type = static_cast<Type>(dh.type);
368
369 DCHECK_GE(message->num_payload_bytes(),
370 data_payload_index + dh.num_bytes);
371 DCHECK_GE(message->num_ports(),
372 port_index + dh.num_ports);
373 DCHECK_GE(message->num_handles(),
374 platform_handle_index + dh.num_platform_handles);
375
376 PlatformHandle* out_handles =
377 message->num_handles() ? message->handles() + platform_handle_index
378 : nullptr;
379 dispatchers[i].dispatcher = Dispatcher::Deserialize(
380 type, dispatcher_data, dh.num_bytes, message->ports() + port_index,
381 dh.num_ports, out_handles, dh.num_platform_handles);
382 if (!dispatchers[i].dispatcher)
383 return MOJO_RESULT_UNKNOWN;
384
385 dispatcher_data += dh.num_bytes;
386 data_payload_index += dh.num_bytes;
387 port_index += dh.num_ports;
388 platform_handle_index += dh.num_platform_handles;
389 }
390
391 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers,
392 handles))
393 return MOJO_RESULT_UNKNOWN;
394 }
395
396 // Copy message bytes.
397 DCHECK_GE(message->num_payload_bytes(), header->header_size);
398 const char* payload = reinterpret_cast<const char*>(message->payload_bytes());
399 memcpy(bytes, payload + header->header_size,
400 message->num_payload_bytes() - header->header_size);
648 401
649 return MOJO_RESULT_OK; 402 return MOJO_RESULT_OK;
650 } 403 }
651 404
652 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( 405 HandleSignalsState
653 void* bytes, 406 MessagePipeDispatcher::GetHandleSignalsState() const {
654 uint32_t* num_bytes, 407 base::AutoLock lock(signal_lock_);
655 DispatcherVector* dispatchers, 408 return GetHandleSignalsStateNoLock();
656 uint32_t* num_dispatchers, 409 }
657 MojoReadMessageFlags flags) { 410
658 lock().AssertAcquired(); 411 MojoResult MessagePipeDispatcher::AddAwakable(
659 if (transferable_ && channel_) {
660 channel_->EnsureLazyInitialized();
661 } else if (!transferable_) {
662 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
663 RequestNontransferableChannel();
664 return MOJO_RESULT_SHOULD_WAIT;
665 } else if (non_transferable_state_ == CONNECT_CALLED) {
666 return MOJO_RESULT_SHOULD_WAIT;
667 }
668 }
669
670 DCHECK(!dispatchers || dispatchers->empty());
671
672 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
673 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
674
675 if (message_queue_.IsEmpty())
676 return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION;
677
678 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
679 // and release the lock immediately.
680 bool enough_space = true;
681 MessageInTransit* message = message_queue_.PeekMessage();
682 if (num_bytes)
683 *num_bytes = message->num_bytes();
684 if (message->num_bytes() <= max_bytes)
685 memcpy(bytes, message->bytes(), message->num_bytes());
686 else
687 enough_space = false;
688
689 if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
690 if (num_dispatchers)
691 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
692 if (enough_space) {
693 if (queued_dispatchers->empty()) {
694 // Nothing to do.
695 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
696 DCHECK(dispatchers);
697 dispatchers->swap(*queued_dispatchers);
698 } else {
699 enough_space = false;
700 }
701 }
702 } else {
703 if (num_dispatchers)
704 *num_dispatchers = 0;
705 }
706
707 message = nullptr;
708
709 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
710 message_queue_.DiscardMessage();
711
712 // Now it's empty, thus no longer readable.
713 if (message_queue_.IsEmpty()) {
714 // It's currently not possible to wait for non-readability, but we should
715 // do the state change anyway.
716 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
717 }
718 }
719
720 if (!enough_space)
721 return MOJO_RESULT_RESOURCE_EXHAUSTED;
722
723 return MOJO_RESULT_OK;
724 }
725
726 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
727 const {
728 lock().AssertAcquired();
729
730 HandleSignalsState rv;
731 if (!message_queue_.IsEmpty())
732 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
733 if (!message_queue_.IsEmpty() ||
734 (transferable_ && channel_) ||
735 (!transferable_ && non_transferable_state_ != CLOSED))
736 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
737 if (!write_error_ &&
738 ((transferable_ && channel_) ||
739 (!transferable_ && non_transferable_state_ != CLOSED))) {
740 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
741 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
742 }
743 if (write_error_ ||
744 (transferable_ && !channel_) ||
745 (!transferable_ &&
746 ((non_transferable_state_ == CLOSED) || is_closed()))) {
747 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
748 }
749 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
750 return rv;
751 }
752
753 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
754 Awakable* awakable, 412 Awakable* awakable,
755 MojoHandleSignals signals, 413 MojoHandleSignals signals,
756 uintptr_t context, 414 uintptr_t context,
757 HandleSignalsState* signals_state) { 415 HandleSignalsState* signals_state) {
758 lock().AssertAcquired(); 416 base::AutoLock lock(signal_lock_);
759 if (transferable_ && channel_) { 417
760 channel_->EnsureLazyInitialized(); 418 if (port_closed_ || in_transit_) {
761 } else if (!transferable_ && 419 if (signals_state)
762 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { 420 *signals_state = HandleSignalsState();
763 RequestNontransferableChannel(); 421 return MOJO_RESULT_INVALID_ARGUMENT;
764 } 422 }
765 423
766 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 424 HandleSignalsState state = GetHandleSignalsStateNoLock();
425
426 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint "
427 << endpoint_ << " [awakable=" << awakable << "; port="
428 << port_.name() << "; signals=" << signals << "; satisfied="
429 << state.satisfied_signals << "; satisfiable="
430 << state.satisfiable_signals << "]";
431
767 if (state.satisfies(signals)) { 432 if (state.satisfies(signals)) {
768 if (signals_state) 433 if (signals_state)
769 *signals_state = state; 434 *signals_state = state;
435 DVLOG(2) << "Signals already set for " << port_.name();
770 return MOJO_RESULT_ALREADY_EXISTS; 436 return MOJO_RESULT_ALREADY_EXISTS;
771 } 437 }
772 if (!state.can_satisfy(signals)) { 438 if (!state.can_satisfy(signals)) {
773 if (signals_state) 439 if (signals_state)
774 *signals_state = state; 440 *signals_state = state;
441 DVLOG(2) << "Signals impossible to satisfy for " << port_.name();
775 return MOJO_RESULT_FAILED_PRECONDITION; 442 return MOJO_RESULT_FAILED_PRECONDITION;
776 } 443 }
777 444
778 awakable_list_.Add(awakable, signals, context); 445 DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint "
446 << endpoint_ << " [awakable=" << awakable << "; port="
447 << port_.name() << "; signals=" << signals << "]";
448
449 awakables_.Add(awakable, signals, context);
779 return MOJO_RESULT_OK; 450 return MOJO_RESULT_OK;
780 } 451 }
781 452
782 void MessagePipeDispatcher::RemoveAwakableImplNoLock( 453 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
783 Awakable* awakable, 454 HandleSignalsState* signals_state) {
784 HandleSignalsState* signals_state) { 455 base::AutoLock lock(signal_lock_);
785 lock().AssertAcquired(); 456 if (port_closed_ || in_transit_) {
786 457 if (signals_state)
787 awakable_list_.Remove(awakable); 458 *signals_state = HandleSignalsState();
788 if (signals_state) 459 } else if (signals_state) {
789 *signals_state = GetHandleSignalsStateImplNoLock(); 460 *signals_state = GetHandleSignalsStateNoLock();
790 } 461 }
791 462
792 void MessagePipeDispatcher::StartSerializeImplNoLock( 463 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint "
793 size_t* max_size, 464 << endpoint_ << " [awakable=" << awakable << "; port="
794 size_t* max_platform_handles) { 465 << port_.name() << "]";
795 if (!serialized_) 466
796 SerializeInternal(); 467 awakables_.Remove(awakable);
797 468 }
798 *max_platform_handles = 0; 469
799 if (serialized_platform_handle_.is_valid()) 470 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
800 (*max_platform_handles)++; 471 uint32_t* num_ports,
801 if (!serialized_read_buffer_.empty() || 472 uint32_t* num_handles) {
802 !serialized_write_buffer_.empty() || 473 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
803 !serialized_message_queue_.empty()) 474 *num_ports = 1;
804 (*max_platform_handles)++; 475 *num_handles = 0;
805 *max_platform_handles += serialized_fds_.size(); 476 }
806 *max_size = sizeof(SerializedMessagePipeHandleDispatcher); 477
807 } 478 bool MessagePipeDispatcher::EndSerialize(void* destination,
808 479 ports::PortName* ports,
809 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( 480 PlatformHandle* handles) {
810 void* destination, 481 SerializedState* state = static_cast<SerializedState*>(destination);
811 size_t* actual_size, 482 state->pipe_id = pipe_id_;
812 PlatformHandleVector* platform_handles) { 483 state->endpoint = static_cast<int8_t>(endpoint_);
813 CloseImplNoLock(); 484 ports[0] = port_.name();
814 SerializedMessagePipeHandleDispatcher* serialization = 485 return true;
815 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); 486 }
816 serialization->transferable = transferable_; 487
817 serialization->pipe_id = pipe_id_; 488 bool MessagePipeDispatcher::BeginTransit() {
818 if (serialized_platform_handle_.is_valid()) { 489 base::AutoLock lock(signal_lock_);
819 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); 490 if (in_transit_)
820 serialization->platform_handle_index = 491 return false;
821 static_cast<uint32_t>(platform_handles->size()); 492 in_transit_ = true;
822 platform_handles->push_back(serialized_platform_handle_.release()); 493 return in_transit_;
494 }
495
496 void MessagePipeDispatcher::CompleteTransitAndClose() {
497 node_controller_->SetPortObserver(port_, nullptr);
498
499 base::AutoLock lock(signal_lock_);
500 in_transit_ = false;
501 port_transferred_ = true;
502 CloseNoLock();
503 }
504
505 void MessagePipeDispatcher::CancelTransit() {
506 base::AutoLock lock(signal_lock_);
507 in_transit_ = false;
508
509 // Something may have happened while we were waiting for potential transit.
510 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
511 }
512
513 // static
514 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
515 const void* data,
516 size_t num_bytes,
517 const ports::PortName* ports,
518 size_t num_ports,
519 PlatformHandle* handles,
520 size_t num_handles) {
521 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
522 return nullptr;
523
524 const SerializedState* state = static_cast<const SerializedState*>(data);
525
526 ports::PortRef port;
527 CHECK_EQ(
528 ports::OK,
529 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port));
530
531 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
532 state->pipe_id, state->endpoint);
533 }
534
535 MessagePipeDispatcher::~MessagePipeDispatcher() {
536 DCHECK(port_closed_ && !in_transit_);
537 }
538
539 MojoResult MessagePipeDispatcher::CloseNoLock() {
540 signal_lock_.AssertAcquired();
541 if (port_closed_ || in_transit_)
542 return MOJO_RESULT_INVALID_ARGUMENT;
543
544 port_closed_ = true;
545 awakables_.CancelAll();
546
547 if (!port_transferred_) {
548 base::AutoUnlock unlock(signal_lock_);
549 node_controller_->ClosePort(port_);
550 }
551
552 return MOJO_RESULT_OK;
553 }
554
555 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
556 HandleSignalsState rv;
557
558 ports::PortStatus port_status;
559 if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
560 CHECK(in_transit_ || port_transferred_ || port_closed_);
561 return HandleSignalsState();
562 }
563
564 if (port_status.has_messages) {
565 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
566 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
567 }
568 if (port_status.receiving_messages)
569 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
570 if (!port_status.peer_closed) {
571 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
572 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
573 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
823 } else { 574 } else {
824 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; 575 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
825 } 576 }
826 577 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
827 serialization->write_error = write_error_; 578 return rv;
828 DCHECK(serialized_read_buffer_.size() < std::numeric_limits<uint32_t>::max()); 579 }
829 serialization->serialized_read_buffer_size = 580
830 static_cast<uint32_t>(serialized_read_buffer_.size()); 581 void MessagePipeDispatcher::OnPortStatusChanged() {
831 DCHECK(serialized_write_buffer_.size() < 582 base::AutoLock lock(signal_lock_);
832 std::numeric_limits<uint32_t>::max()); 583
833 serialization->serialized_write_buffer_size = 584 // We stop observing our port as soon as it's transferred, but this can race
834 static_cast<uint32_t>(serialized_write_buffer_.size()); 585 // with events which are raised right before that happens. This is fine to
835 DCHECK(serialized_message_queue_.size() < 586 // ignore.
836 std::numeric_limits<uint32_t>::max()); 587 if (port_transferred_)
837 serialization->serialized_message_queue_size = 588 return;
838 static_cast<uint32_t>(serialized_message_queue_.size()); 589
839 590 #if !defined(NDEBUG)
840 serialization->shared_memory_size = static_cast<uint32_t>( 591 ports::PortStatus port_status;
841 serialization->serialized_read_buffer_size + 592 node_controller_->node()->GetStatus(port_, &port_status);
842 serialization->serialized_write_buffer_size + 593 if (port_status.has_messages) {
843 serialization->serialized_message_queue_size); 594 ports::ScopedMessage unused;
844 if (serialization->shared_memory_size) { 595 size_t message_size = 0;
845 scoped_refptr<PlatformSharedBuffer> shared_buffer( 596 node_controller_->node()->GetMessageIf(
846 internal::g_platform_support->CreateSharedBuffer( 597 port_, [&message_size](const ports::Message& message) {
847 serialization->shared_memory_size)); 598 message_size = message.num_payload_bytes();
848 scoped_ptr<PlatformSharedBufferMapping> mapping( 599 return false;
849 shared_buffer->Map(0, serialization->shared_memory_size)); 600 }, &unused);
850 char* start = static_cast<char*>(mapping->GetBase()); 601 DVLOG(1) << "New message detected on message pipe " << pipe_id_
851 start = SerializeBuffer(start, &serialized_read_buffer_); 602 << " endpoint " << endpoint_ << " [port=" << port_.name()
852 start = SerializeBuffer(start, &serialized_write_buffer_); 603 << "; size=" << message_size << "]";
853 start = SerializeBuffer(start, &serialized_message_queue_); 604 }
854 605 if (port_status.peer_closed) {
855 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); 606 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_
856 serialization->shared_memory_handle_index = 607 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
857 static_cast<uint32_t>(platform_handles->size()); 608 }
858 platform_handles->push_back(shared_buffer->PassPlatformHandle().release());
859 } else {
860 serialization->shared_memory_handle_index = kInvalidMessagePipeHandleIndex;
861 }
862
863 DCHECK(serialized_read_fds_length_ < std::numeric_limits<uint32_t>::max());
864 serialization->serialized_read_fds_length =
865 static_cast<uint32_t>(serialized_read_fds_length_);
866 DCHECK(serialized_write_fds_length_ < std::numeric_limits<uint32_t>::max());
867 serialization->serialized_write_fds_length =
868 static_cast<uint32_t>(serialized_write_fds_length_);
869 DCHECK(serialized_message_fds_length_ < std::numeric_limits<uint32_t>::max());
870 serialization->serialized_message_fds_length =
871 static_cast<uint32_t>(serialized_message_fds_length_);
872 if (serialized_fds_.empty()) {
873 serialization->serialized_fds_index = kInvalidMessagePipeHandleIndex;
874 } else {
875 #if defined(OS_POSIX)
876 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
877 serialization->serialized_fds_index =
878 static_cast<uint32_t>(platform_handles->size());
879 for (size_t i = 0; i < serialized_fds_.size(); ++i)
880 platform_handles->push_back(PlatformHandle(serialized_fds_[i]));
881 serialized_fds_.clear();
882 #endif 609 #endif
883 } 610
884 611 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
885 *actual_size = sizeof(SerializedMessagePipeHandleDispatcher);
886 return true;
887 }
888
889 void MessagePipeDispatcher::TransportStarted() {
890 started_transport_.Acquire();
891 }
892
893 void MessagePipeDispatcher::TransportEnded() {
894 started_transport_.Release();
895
896 base::AutoLock locker(lock());
897
898 // If transporting of MPD failed, we might have got more data and didn't
899 // awake for.
900 // TODO(jam): should we care about only alerting if it was empty before
901 // TransportStarted?
902 if (!message_queue_.IsEmpty())
903 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
904 }
905
906 void MessagePipeDispatcher::OnReadMessage(
907 const MessageInTransit::View& message_view,
908 ScopedPlatformHandleVectorPtr platform_handles) {
909 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
910 if (message_view.transport_data_buffer_size() > 0) {
911 DCHECK(message_view.transport_data_buffer());
912 message->SetDispatchers(TransportData::DeserializeDispatchers(
913 message_view.transport_data_buffer(),
914 message_view.transport_data_buffer_size(),
915 std::move(platform_handles)));
916 }
917
918 bool call_release = false;
919 if (started_transport_.Try()) {
920 // We're not in the middle of being sent.
921
922 // Can get synchronously called back in Init if there was initial data.
923 scoped_ptr<base::AutoLock> locker;
924 if (!calling_init_)
925 locker.reset(new base::AutoLock(lock()));
926
927 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
928 if (transferable_) {
929 channel_->Shutdown();
930 } else {
931 internal::g_broker->CloseMessagePipe(pipe_id_, this);
932 non_transferable_state_ = CLOSED;
933 }
934 channel_ = nullptr;
935 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
936 if (close_requested_) {
937 // We requested the other side to close the connection while they also
938 // did the same. We must balance out the AddRef in CloseOnIO to ensure
939 // this object isn't leaked.
940 call_release = true;
941 }
942 } else {
943 bool was_empty = message_queue_.IsEmpty();
944 message_queue_.AddMessage(std::move(message));
945 if (was_empty)
946 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
947 }
948
949 started_transport_.Release();
950 } else {
951 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
952 // We got a request to shutdown the channel but this object is already
953 // calling into channel to serialize it. Since all the other side cares
954 // about is flushing pending messages, we bounce the quit back to it.
955 scoped_ptr<MessageInTransit> message(new MessageInTransit(
956 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
957 channel_->WriteMessage(std::move(message));
958 } else {
959 // If RawChannel is calling OnRead, that means it has its read_lock_
960 // acquired. That means StartSerialize can't be accessing message queue as
961 // it waits on ReleaseHandle first which acquires read_lock_.
962 message_queue_.AddMessage(std::move(message));
963 }
964 }
965
966 if (call_release)
967 Release();
968 }
969
970 void MessagePipeDispatcher::OnError(Error error) {
971 // If there's a read error, then the other side of the pipe is closed. By
972 // definition, we can't write since there's no one to read it. And we can't
973 // read anymore, since we just got a read erorr. So we close the pipe.
974 // If there's a write error, then we stop writing. But we keep the pipe open
975 // until we finish reading everything in it. This is because it's valid for
976 // one endpoint to write some data and close their pipe immediately. Even
977 // though the other end can't write anymore, it should still get all the data.
978 switch (error) {
979 case ERROR_READ_SHUTDOWN:
980 // The other side was cleanly closed, so this isn't actually an error.
981 DVLOG(1) << "MessagePipeDispatcher read error (shutdown)";
982 break;
983 case ERROR_READ_BROKEN:
984 DVLOG(1) << "MessagePipeDispatcher read error (connection broken)";
985 break;
986 case ERROR_READ_BAD_MESSAGE:
987 // Receiving a bad message means either a bug, data corruption, or
988 // malicious attack (probably due to some other bug).
989 LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)";
990 break;
991 case ERROR_READ_UNKNOWN:
992 LOG(ERROR) << "MessagePipeDispatcher read error (unknown)";
993 break;
994 case ERROR_WRITE:
995 // Write errors are slightly notable: they probably shouldn't happen under
996 // normal operation (but maybe the other side crashed).
997 LOG(WARNING) << "MessagePipeDispatcher write error";
998 DCHECK_EQ(write_error_, false) << "Should only get one write error.";
999 write_error_ = true;
1000 break;
1001 }
1002
1003 bool call_release = false;
1004 if (started_transport_.Try()) {
1005 base::AutoLock locker(lock());
1006 if (channel_ && error != ERROR_WRITE) {
1007 if (transferable_) {
1008 channel_->Shutdown();
1009 } else {
1010 CHECK_NE(non_transferable_state_, CLOSED);
1011 internal::g_broker->CloseMessagePipe(pipe_id_, this);
1012 non_transferable_state_ = CLOSED;
1013 }
1014 channel_ = nullptr;
1015 if (close_requested_) {
1016 // Balance AddRef in CloseOnIO.
1017 call_release = true;
1018 }
1019 } else if (!channel_ && !transferable_ &&
1020 non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
1021 // Balance AddRef in CloseOnIO.
1022 call_release = true;
1023 }
1024 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
1025 started_transport_.Release();
1026 } else {
1027 // We must be waiting to call ReleaseHandle. It will call Shutdown.
1028 }
1029
1030 if (call_release)
1031 Release();
1032 }
1033
1034 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
1035 MessageInTransit* message,
1036 std::vector<DispatcherTransport>* transports) {
1037 DCHECK(!message->has_dispatchers());
1038
1039 // You're not allowed to send either handle to a message pipe over the message
1040 // pipe, so check for this. (The case of trying to write a handle to itself is
1041 // taken care of by |Core|. That case kind of makes sense, but leads to
1042 // complications if, e.g., both sides try to do the same thing with their
1043 // respective handles simultaneously. The other case, of trying to write the
1044 // peer handle to a handle, doesn't make sense -- since no handle will be
1045 // available to read the message from.)
1046 for (size_t i = 0; i < transports->size(); i++) {
1047 if (!(*transports)[i].is_valid())
1048 continue;
1049 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
1050 MessagePipeDispatcher* mp =
1051 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
1052 if (transferable_ && mp->transferable_ &&
1053 channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
1054 // The other case should have been disallowed by |Core|. (Note: |port|
1055 // is the peer port of the handle given to |WriteMessage()|.)
1056 return MOJO_RESULT_INVALID_ARGUMENT;
1057 } else if (!transferable_ && !mp->transferable_ &&
1058 pipe_id_ == mp->pipe_id_) {
1059 return MOJO_RESULT_INVALID_ARGUMENT;
1060 }
1061 }
1062 }
1063
1064 // Clone the dispatchers and attach them to the message. (This must be done as
1065 // a separate loop, since we want to leave the dispatchers alone on failure.)
1066 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
1067 dispatchers->reserve(transports->size());
1068 for (size_t i = 0; i < transports->size(); i++) {
1069 if ((*transports)[i].is_valid()) {
1070 dispatchers->push_back(
1071 (*transports)[i].CreateEquivalentDispatcherAndClose());
1072 } else {
1073 LOG(WARNING) << "Enqueueing null dispatcher";
1074 dispatchers->push_back(nullptr);
1075 }
1076 }
1077 message->SetDispatchers(std::move(dispatchers));
1078 return MOJO_RESULT_OK;
1079 }
1080
1081 void MessagePipeDispatcher::RequestNontransferableChannel() {
1082 lock().AssertAcquired();
1083 CHECK(!transferable_);
1084 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
1085 non_transferable_state_ = CONNECT_CALLED;
1086 #if !defined(OFFICIAL_BUILD)
1087 non_transferable_bound_stack_.reset(new base::debug::StackTrace);
1088 #endif
1089
1090 // PostTask since the broker can call us back synchronously.
1091 internal::g_io_thread_task_runner->PostTask(
1092 FROM_HERE,
1093 base::Bind(&Broker::ConnectMessagePipe,
1094 base::Unretained(internal::g_broker), pipe_id_,
1095 base::Unretained(this)));
1096 } 612 }
1097 613
1098 } // namespace edk 614 } // namespace edk
1099 } // namespace mojo 615 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698