Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(129)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: some review comments Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/system/configuration.h"
12 #include "mojo/edk/system/message_in_transit.h"
13 #include "mojo/edk/system/options_validation.h"
14 #include "mojo/edk/system/transport_data.h"
15
16 namespace mojo {
17 namespace edk {
18
19 // TODO(jam): do more tests on using channel on same thread if it supports it (
20 // i.e. with USE_CHROME_EDK and Windows). Also see ipc_channel_mojo.cc
21 bool g_use_channel_on_io = true;
22
23 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
24
25 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
26 size_t platform_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
27 size_t read_buffer_size; // any bytes after this are serialized messages
28 };
29
30 // MessagePipeDispatcher -------------------------------------------------------
31
32 const MojoCreateMessagePipeOptions
33 MessagePipeDispatcher::kDefaultCreateOptions = {
34 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
35 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
36
37 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
38 const MojoCreateMessagePipeOptions* in_options,
39 MojoCreateMessagePipeOptions* out_options) {
40 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
41 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
42
43 *out_options = kDefaultCreateOptions;
44 if (!in_options)
45 return MOJO_RESULT_OK;
46
47 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
48 if (!reader.is_valid())
49 return MOJO_RESULT_INVALID_ARGUMENT;
50
51 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
52 return MOJO_RESULT_OK;
53 if ((reader.options().flags & ~kKnownFlags))
54 return MOJO_RESULT_UNIMPLEMENTED;
55 out_options->flags = reader.options().flags;
56
57 // Checks for fields beyond |flags|:
58
59 // (Nothing here yet.)
60
61 return MOJO_RESULT_OK;
62 }
63
64 void MessagePipeDispatcher::Init(ScopedPlatformHandle message_pipe) {
65 InitWithReadBuffer(message_pipe.Pass(), nullptr, 0);
66 }
67
68 void MessagePipeDispatcher::InitWithReadBuffer(
69 ScopedPlatformHandle message_pipe,
70 char* data,
71 size_t size) {
72 if (message_pipe.get().is_valid()) {
73 channel_ = RawChannel::Create(message_pipe.Pass());
74
75
76
77
78 // TODO(jam): pass this in Init call....
79 if (size)
80 channel_->SetInitialReadBufferData(data, size);
81 if (g_use_channel_on_io) {
82 internal::g_io_thread_task_runner->PostTask(
83 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
84 } else {
85 InitOnIO();
86 }
87 // TODO(jam): optimize for when running on IO thread
88 }
89 }
90
91 void MessagePipeDispatcher::InitOnIO() {
92 base::AutoLock locker(lock());
93 calling_init_ = true;
94 if (channel_)
95 channel_->Init(this);
96 calling_init_ = false;
97 }
98
99 void MessagePipeDispatcher::CloseOnIO() {
100 base::AutoLock locker(lock());
101
102 // TODO(jam) CLEANUP! this should be done inside RawChannel.....
103 if (channel_) {
104 channel_->Shutdown();
105 channel_ = nullptr;
106 }
107 }
108
109 Dispatcher::Type MessagePipeDispatcher::GetType() const {
110 return Type::MESSAGE_PIPE;
111 }
112
113
114
115 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
116 // best way we want to share this. Need to also consider posix which does
117 // require access to the RawChannel.
118 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
119 size_t num_platform_handles,
120 const void* platform_handle_table) {
121 // TODO(jam): this code will have to be updated once it's used in a sandbox
122 // and the receiving process doesn't have duplicate permission for the
123 // receiver. Once there's a broker and we have a connection to it (possibly
124 // through ConnectionManager), then we can make a sync IPC to it here to get a
125 // token for this handle, and it will duplicate the handle to is process. Then
126 // we pass the token to the receiver, which will then make a sync call to the
127 // broker to get a duplicated handle. This will also allow us to avoid leaks
128 // of the handle if the receiver dies, since the broker can notice that.
129 DCHECK_GT(num_platform_handles, 0u);
130 ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector());
131
132 #if defined(OS_WIN)
133 const char* serialization_data =
134 static_cast<const char*>(platform_handle_table);
135 for (size_t i = 0; i < num_platform_handles; i++) {
136 DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data);
137 serialization_data += sizeof(DWORD);
138 HANDLE source_handle = *reinterpret_cast<const HANDLE*>(serialization_data);
139 serialization_data += sizeof(HANDLE);
140 base::Process sender =
141 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
142 DCHECK(sender.IsValid());
143 HANDLE target_handle = NULL;
144 BOOL dup_result =
145 DuplicateHandle(sender.Handle(), source_handle,
146 base::GetCurrentProcessHandle(), &target_handle, 0,
147 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
148 DCHECK(dup_result);
149 rv->push_back(PlatformHandle(target_handle));
150 }
151 #else
152 NOTREACHED() << "TODO(jam): implement";
153 #endif
154 return rv.Pass();
155 }
156
157 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
158 const void* source,
159 size_t size,
160 PlatformHandleVector* platform_handles) {
161 const SerializedMessagePipeHandleDispatcher* serialization =
162 static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
163 size_t platform_handle_index = serialization->platform_handle_index;
164
165
166 // Starts off invalid, which is what we want.
167 PlatformHandle platform_handle;
168
169 if (platform_handle_index != kInvalidMessagePipeHandleIndex) {
170 if (!platform_handles ||
171 platform_handle_index >= platform_handles->size()) {
172 LOG(ERROR)
173 << "Invalid serialized platform handle dispatcher (missing handles)";
174 return nullptr;
175 }
176
177 // We take ownership of the handle, so we have to invalidate the one in
178 // |platform_handles|.
179 std::swap(platform_handle, (*platform_handles)[platform_handle_index]);
180 }
181
182
183 // TODO(jam): temporary until we send message_queue_ via shared memory.
184 size -= sizeof(SerializedMessagePipeHandleDispatcher);
185 const char* messages = static_cast<const char*>(source);
186 messages += sizeof(SerializedMessagePipeHandleDispatcher);
187
188
189 char* initial_read_data = nullptr;
190 size_t initial_read_size = 0;
191
192 if (serialization->read_buffer_size) {
193 initial_read_data = const_cast<char*>(messages);
194 initial_read_size = serialization->read_buffer_size;
195
196 messages += initial_read_size;
197 size -= initial_read_size;
198 }
199
200 scoped_refptr<MessagePipeDispatcher> rv(
201 Create(MessagePipeDispatcher::kDefaultCreateOptions));
202 rv->InitWithReadBuffer(
203 ScopedPlatformHandle(platform_handle),
204 initial_read_data, initial_read_size);
205
206 while (size) {
207 size_t message_size;
208 CHECK(MessageInTransit::GetNextMessageSize(
209 messages, size, &message_size));
210 MessageInTransit::View message_view(message_size, messages);
211 size -= message_size;
212 messages += message_size;
213
214 // copied from RawChannel::OnReadCompleted
215 // TODO(jam): don't copy
216 ScopedPlatformHandleVectorPtr platform_handles;
217 if (message_view.transport_data_buffer()) {
218 size_t num_platform_handles;
219 const void* platform_handle_table;
220 TransportData::GetPlatformHandleTable(
221 message_view.transport_data_buffer(), &num_platform_handles,
222 &platform_handle_table);
223
224 if (num_platform_handles > 0) {
225 platform_handles =
226 GetReadPlatformHandles(num_platform_handles,
227 platform_handle_table).Pass();
228 if (!platform_handles) {
229 LOG(ERROR) << "Invalid number of platform handles received";
230 return nullptr;
231 }
232 }
233 }
234
235
236 // copied below from OnReadMessage
237 // TODO(jam): don't copy
238 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
239 if (message_view.transport_data_buffer_size() > 0) {
240 DCHECK(message_view.transport_data_buffer());
241 message->SetDispatchers(TransportData::DeserializeDispatchers(
242 message_view.transport_data_buffer(),
243 message_view.transport_data_buffer_size(), platform_handles.Pass()));
244 }
245
246 rv->message_queue_.AddMessage(message.Pass());
247 }
248
249 return rv;
250 }
251
252 MessagePipeDispatcher::MessagePipeDispatcher()
253 : channel_(nullptr),
254 serialized_(false),
255 calling_init_(false),
256 error_(false) {
257 }
258
259 MessagePipeDispatcher::~MessagePipeDispatcher() {
260 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
261 DCHECK(!channel_);
262 }
263
264 void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
265 lock().AssertAcquired();
266 awakable_list_.CancelAll();
267 }
268
269 void MessagePipeDispatcher::CloseImplNoLock() {
270 lock().AssertAcquired();
271 if (g_use_channel_on_io) {
272 internal::g_io_thread_task_runner->PostTask(
273 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
274 } else {
275 CloseOnIO();
276 }
277 }
278
279 void MessagePipeDispatcher::SerializeInternal() {
280 // need to stop watching handle immediately, even tho not on IO thread, so
281 // that other messages aren't read after this.
282 {
283 if (channel_) {
284 serialized_platform_handle_ =
285 channel_->ReleaseHandle(&serialized_read_buffer_).release();
286 channel_ = nullptr;
287 } else {
288 // It's valid that the other side wrote some data and closed its end.
289 }
290 }
291
292 DCHECK(serialized_message_queue_.empty());
293 // see comment in method below, this is only temporary till we implement a
294 // solution with shared buffer
295 while (!message_queue_.IsEmpty()) {
296 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
297 size_t cur_size = serialized_message_queue_.size();
298
299
300 // When MojoWriteMessage is called, the MessageInTransit doesn't have
301 // dispatchers set and CreateEquivaent... is called since the dispatchers
302 // can be referenced by others. here dispatchers aren't referenced by
303 // others, but rawchannel can still call to them. so since we dont call
304 // createequiv, manually call TransportStarted and TransportEnd.
305 DispatcherVector dispatchers;
306 if (message->has_dispatchers())
307 dispatchers = *message->dispatchers();
308 for (size_t i = 0; i < dispatchers.size(); ++i)
309 dispatchers[i]->TransportStarted();
310
311 //TODO(jam): this handling for dispatchers only works on windows where we
312 //send transportdata as bytes instead of as parameters to sendmsg.
313 message->SerializeAndCloseDispatchers();
314 // cont'd below
315
316
317 size_t main_buffer_size = message->main_buffer_size();
318 size_t transport_data_buffer_size = message->transport_data() ?
319 message->transport_data()->buffer_size() : 0;
320 size_t total_size = message->total_size();
321
322 serialized_message_queue_.resize(cur_size + total_size);
323 memcpy(&serialized_message_queue_[cur_size], message->main_buffer(),
324 main_buffer_size);
325
326 // cont'd
327 if (transport_data_buffer_size != 0) {
328 #if defined(OS_WIN)
329 // TODO(jam): copied from RawChannelWin::WriteNoLock(
330 if (RawChannel::GetSerializedPlatformHandleSize()) {
331 char* serialization_data =
332 static_cast<char*>(message->transport_data()->buffer()) +
333 message->transport_data()->platform_handle_table_offset();
334 PlatformHandleVector* all_platform_handles =
335 message->transport_data()->platform_handles();
336 if (all_platform_handles) {
337 DWORD current_process_id = base::GetCurrentProcId();
338 for (size_t i = 0; i < all_platform_handles->size(); i++) {
339 *reinterpret_cast<DWORD*>(serialization_data) = current_process_id;
340 serialization_data += sizeof(DWORD);
341 *reinterpret_cast<HANDLE*>(serialization_data) =
342 all_platform_handles->at(i).handle;
343 serialization_data += sizeof(HANDLE);
344 all_platform_handles->at(i) = PlatformHandle();
345 }
346 }
347 }
348
349 memcpy(&serialized_message_queue_[
350 cur_size + total_size - transport_data_buffer_size],
351 message->transport_data()->buffer(), transport_data_buffer_size);
352 #else
353 NOTREACHED() << "TODO(jam) implement";
354 #endif
355 }
356
357 for (size_t i = 0; i < dispatchers.size(); ++i)
358 dispatchers[i]->TransportEnded();
359 }
360
361 serialized_ = true;
362 }
363
364 scoped_refptr<Dispatcher>
365 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
366 lock().AssertAcquired();
367
368 SerializeInternal();
369
370 // TODO(vtl): Currently, there are no options, so we just use
371 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
372 // too.
373 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
374 rv->serialized_platform_handle_ = serialized_platform_handle_;
375 serialized_platform_handle_ = PlatformHandle();
376 serialized_message_queue_.swap(rv->serialized_message_queue_);
377 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
378 rv->serialized_ = true;
379 return scoped_refptr<Dispatcher>(rv.get());
380 }
381
382 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
383 const void* bytes,
384 uint32_t num_bytes,
385 std::vector<DispatcherTransport>* transports,
386 MojoWriteMessageFlags flags) {
387
388 DCHECK(!transports ||
389 (transports->size() > 0 &&
390 transports->size() <= GetConfiguration().max_message_num_handles));
391
392 lock().AssertAcquired();
393
394 if (!channel_) {
395 DCHECK(error_);
396 return MOJO_RESULT_FAILED_PRECONDITION;
397 }
398
399 if (num_bytes > GetConfiguration().max_message_num_bytes)
400 return MOJO_RESULT_RESOURCE_EXHAUSTED;
401 scoped_ptr<MessageInTransit> message(new MessageInTransit(
402 MessageInTransit::Type::MESSAGE, num_bytes, bytes));
403 if (transports) {
404 MojoResult result = AttachTransportsNoLock(message.get(), transports);
405 if (result != MOJO_RESULT_OK)
406 return result;
407 }
408
409 // TODO(jam): pass in GetSerializedPlatformHandleSize instead of RawChannel
410 message->SerializeAndCloseDispatchers();
411 channel_->WriteMessage(message.Pass());
412
413 return MOJO_RESULT_OK;
414 }
415
416 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
417 void* bytes,
418 uint32_t* num_bytes,
419 DispatcherVector* dispatchers,
420 uint32_t* num_dispatchers,
421 MojoReadMessageFlags flags) {
422 lock().AssertAcquired();
423 DCHECK(!dispatchers || dispatchers->empty());
424
425 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
426 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
427
428 if (message_queue_.IsEmpty()) {
429 return error_ ? MOJO_RESULT_FAILED_PRECONDITION
430 : MOJO_RESULT_SHOULD_WAIT;
431 }
432
433 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
434 // and release the lock immediately.
435 bool enough_space = true;
436 MessageInTransit* message = message_queue_.PeekMessage();
437 if (num_bytes)
438 *num_bytes = message->num_bytes();
439 if (message->num_bytes() <= max_bytes)
440 memcpy(bytes, message->bytes(), message->num_bytes());
441 else
442 enough_space = false;
443
444 if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
445 if (num_dispatchers)
446 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
447 if (enough_space) {
448 if (queued_dispatchers->empty()) {
449 // Nothing to do.
450 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
451 DCHECK(dispatchers);
452 dispatchers->swap(*queued_dispatchers);
453 } else {
454 enough_space = false;
455 }
456 }
457 } else {
458 if (num_dispatchers)
459 *num_dispatchers = 0;
460 }
461
462 message = nullptr;
463
464 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
465 message_queue_.DiscardMessage();
466
467 // Now it's empty, thus no longer readable.
468 if (message_queue_.IsEmpty()) {
469 // It's currently not possible to wait for non-readability, but we should
470 // do the state change anyway.
471 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
472 }
473 }
474
475 if (!enough_space)
476 return MOJO_RESULT_RESOURCE_EXHAUSTED;
477
478 return MOJO_RESULT_OK;
479 }
480
481 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
482 const {
483 lock().AssertAcquired();
484 // return message_pipe_->GetHandleSignalsState(port_);
485
486 HandleSignalsState rv;
487 if (!message_queue_.IsEmpty()) {
488 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
489 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
490 }
491 if (!error_) {
492 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
493 rv.satisfiable_signals |=
494 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
495 } else {
496 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
497 }
498 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
499 return rv;
500 }
501
502 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
503 Awakable* awakable,
504 MojoHandleSignals signals,
505 uint32_t context,
506 HandleSignalsState* signals_state) {
507 lock().AssertAcquired();
508 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
509 if (state.satisfies(signals)) {
510 if (signals_state)
511 *signals_state = state;
512 return MOJO_RESULT_ALREADY_EXISTS;
513 }
514 if (!state.can_satisfy(signals)) {
515 if (signals_state)
516 *signals_state = state;
517 return MOJO_RESULT_FAILED_PRECONDITION;
518 }
519
520 awakable_list_.Add(awakable, signals, context);
521 return MOJO_RESULT_OK;
522 }
523
524 void MessagePipeDispatcher::RemoveAwakableImplNoLock(
525 Awakable* awakable,
526 HandleSignalsState* signals_state) {
527 lock().AssertAcquired();
528
529 awakable_list_.Remove(awakable);
530 if (signals_state)
531 *signals_state = GetHandleSignalsStateImplNoLock();
532 }
533
534 void MessagePipeDispatcher::StartSerializeImplNoLock(
535 size_t* max_size,
536 size_t* max_platform_handles) {
537 // see comment in dispatcher::startserialize
538 // DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
539
540 if (!serialized_) {
541 // handles the case where we have messages read off rawchannel but not
542 // ready by MojoReadMessage.
543 SerializeInternal();
544 }
545
546 *max_platform_handles = serialized_platform_handle_.is_valid() ? 1 : 0;
547
548 DCHECK_EQ(serialized_message_queue_.size() %
549 MessageInTransit::kMessageAlignment, 0U);
550 *max_size = sizeof(SerializedMessagePipeHandleDispatcher) +
551 serialized_message_queue_.size() +
552 serialized_read_buffer_.size();
553
554 DCHECK_LE(*max_size, TransportData::kMaxSerializedDispatcherSize);
555 }
556
557 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
558 void* destination,
559 size_t* actual_size,
560 PlatformHandleVector* platform_handles) {
561 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
562
563 CloseImplNoLock();
564 SerializedMessagePipeHandleDispatcher* serialization =
565 static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
566 if (serialized_platform_handle_.is_valid()) {
567 serialization->platform_handle_index = platform_handles->size();
568 platform_handles->push_back(serialized_platform_handle_);
569 } else {
570 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
571 }
572 serialization->read_buffer_size = serialized_read_buffer_.size();
573
574 char* destination_char = static_cast<char*>(destination);
575 destination_char += sizeof(SerializedMessagePipeHandleDispatcher);
576
577 if (!serialized_read_buffer_.empty()) {
578 memcpy(destination_char, &serialized_read_buffer_[0],
579 serialized_read_buffer_.size());
580 destination_char += serialized_read_buffer_.size();
581 }
582
583
584 if (!serialized_message_queue_.empty()) {
585 memcpy(destination_char,
586 &serialized_message_queue_[0],
587 serialized_message_queue_.size());
588 }
589
590 *actual_size =
591 sizeof(SerializedMessagePipeHandleDispatcher) +
592 serialized_message_queue_.size() +
593 serialized_read_buffer_.size();
594
595 return true;
596 }
597
598 void MessagePipeDispatcher::TransportStarted() {
599 started_transport_.Acquire();
600 }
601
602 void MessagePipeDispatcher::TransportEnded() {
603 started_transport_.Release();
604
605 base::AutoLock locker(lock());
606
607 // If transporting of MPD failed, we might have got more data and didn't
608 // awake for.
609 // TODO(jam): should we care about only alerting if it was empty before
610 // TransportStarted?
611 if (!message_queue_.IsEmpty())
612 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
613 }
614
615 void MessagePipeDispatcher::OnReadMessage(
616 const MessageInTransit::View& message_view,
617 ScopedPlatformHandleVectorPtr platform_handles) {
618 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
619 if (message_view.transport_data_buffer_size() > 0) {
620 DCHECK(message_view.transport_data_buffer());
621 message->SetDispatchers(TransportData::DeserializeDispatchers(
622 message_view.transport_data_buffer(),
623 message_view.transport_data_buffer_size(), platform_handles.Pass()));
624 }
625
626 if (started_transport_.Try()) {
627 // we're not in the middle of being sent
628
629 // Can get synchronously called back in Init if there was initial data.
630 scoped_ptr<base::AutoLock> locker;
631 if (!calling_init_) {
632 locker.reset(new base::AutoLock(lock()));
633 }
634
635 bool was_empty = message_queue_.IsEmpty();
636 message_queue_.AddMessage(message.Pass());
637 if (was_empty)
638 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
639
640 started_transport_.Release();
641 } else {
642
643 // if RawChannel is calling OnRead, that means it has its read_lock_
644 // acquired. that means StartSerialize can't be accessing message queue as
645 // it waits on releasehandle first which acquires readlock_!
646 message_queue_.AddMessage(message.Pass());
647 }
648 }
649
650 void MessagePipeDispatcher::OnError(Error error) {
651 switch (error) {
652 case ERROR_READ_SHUTDOWN:
653 // The other side was cleanly closed, so this isn't actually an error.
654 DVLOG(1) << "MessagePipeDispatcher read error (shutdown)";
655 break;
656 case ERROR_READ_BROKEN:
657 LOG(ERROR) << "MessagePipeDispatcher read error (connection broken)";
658 break;
659 case ERROR_READ_BAD_MESSAGE:
660 // Receiving a bad message means either a bug, data corruption, or
661 // malicious attack (probably due to some other bug).
662 LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)";
663 break;
664 case ERROR_READ_UNKNOWN:
665 LOG(ERROR) << "MessagePipeDispatcher read error (unknown)";
666 break;
667 case ERROR_WRITE:
668 // Write errors are slightly notable: they probably shouldn't happen under
669 // normal operation (but maybe the other side crashed).
670 LOG(WARNING) << "MessagePipeDispatcher write error";
671 break;
672 }
673
674 error_ = true;
675 if (started_transport_.Try()) {
676 base::AutoLock locker(lock());
677 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
678
679 base::MessageLoop::current()->PostTask(
680 FROM_HERE,
681 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
682 channel_ = nullptr;
683 started_transport_.Release();
684 } else {
685 // We must be waiting to call ReleaseHandle. It will call Shutdown.
686 }
687 }
688
689 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
690 MessageInTransit* message,
691 std::vector<DispatcherTransport>* transports) {
692 DCHECK(!message->has_dispatchers());
693
694 // You're not allowed to send either handle to a message pipe over the message
695 // pipe, so check for this. (The case of trying to write a handle to itself is
696 // taken care of by |Core|. That case kind of makes sense, but leads to
697 // complications if, e.g., both sides try to do the same thing with their
698 // respective handles simultaneously. The other case, of trying to write the
699 // peer handle to a handle, doesn't make sense -- since no handle will be
700 // available to read the message from.)
701 for (size_t i = 0; i < transports->size(); i++) {
702 if (!(*transports)[i].is_valid())
703 continue;
704 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
705 MessagePipeDispatcher* mp =
706 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
707 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
708 // The other case should have been disallowed by |Core|. (Note: |port|
709 // is the peer port of the handle given to |WriteMessage()|.)
710 return MOJO_RESULT_INVALID_ARGUMENT;
711 }
712 }
713 }
714
715 // Clone the dispatchers and attach them to the message. (This must be done as
716 // a separate loop, since we want to leave the dispatchers alone on failure.)
717 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
718 dispatchers->reserve(transports->size());
719 for (size_t i = 0; i < transports->size(); i++) {
720 if ((*transports)[i].is_valid()) {
721 dispatchers->push_back(
722 (*transports)[i].CreateEquivalentDispatcherAndClose());
723 } else {
724 LOG(WARNING) << "Enqueueing null dispatcher";
725 dispatchers->push_back(nullptr);
726 }
727 }
728 message->SetDispatchers(dispatchers.Pass());
729 return MOJO_RESULT_OK;
730 }
731
732 } // namespace edk
733 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698