Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(488)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: move to mojo::edk namespace in preparation for runtim flag Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/system/configuration.h"
12 #include "mojo/edk/system/memory.h"
13 #include "mojo/edk/system/message_in_transit.h"
14 #include "mojo/edk/system/options_validation.h"
15 #include "mojo/edk/system/transport_data.h"
16
17 // TODO(jam): do more tests on using channel on same thread if it supports it (
18 // i.e. with USE_CHROME_EDK and Windows). Also see ipc_channel_mojo.cc
19 bool g_use_channel_on_io = true;
20
21 namespace mojo {
22 namespace edk {
23
24 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
25
26 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
27 size_t platform_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
28 size_t read_buffer_size; // any bytes after this are serialized messages
29 };
30
31 // MessagePipeDispatcher -------------------------------------------------------
32
33 const MojoCreateMessagePipeOptions
34 MessagePipeDispatcher::kDefaultCreateOptions = {
35 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
36 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
37
38 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
39 UserPointer<const MojoCreateMessagePipeOptions> in_options,
40 MojoCreateMessagePipeOptions* out_options) {
41 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
42 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
43
44 *out_options = kDefaultCreateOptions;
45 if (in_options.IsNull())
46 return MOJO_RESULT_OK;
47
48 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
49 if (!reader.is_valid())
50 return MOJO_RESULT_INVALID_ARGUMENT;
51
52 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
53 return MOJO_RESULT_OK;
54 if ((reader.options().flags & ~kKnownFlags))
55 return MOJO_RESULT_UNIMPLEMENTED;
56 out_options->flags = reader.options().flags;
57
58 // Checks for fields beyond |flags|:
59
60 // (Nothing here yet.)
61
62 return MOJO_RESULT_OK;
63 }
64
65 void MessagePipeDispatcher::Init(ScopedPlatformHandle message_pipe) {
66 InitWithReadBuffer(message_pipe.Pass(), nullptr, 0);
67 }
68
69 void MessagePipeDispatcher::InitWithReadBuffer(
70 ScopedPlatformHandle message_pipe,
71 char* data,
72 size_t size) {
73 if (message_pipe.get().is_valid()) {
74 channel_ = RawChannel::Create(message_pipe.Pass());
75
76
77
78
79 // TODO(jam): pass this in Init call....
80 if (size)
81 channel_->SetInitialReadBufferData(data, size);
82 if (g_use_channel_on_io) {
83 internal::g_io_thread_task_runner->PostTask(
84 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
85 } else {
86 InitOnIO();
87 }
88 // TODO(jam): optimize for when running on IO thread
89 }
90 }
91
92 void MessagePipeDispatcher::InitOnIO() {
93 base::AutoLock locker(lock());
94 calling_init_ = true;
95 if (channel_)
96 channel_->Init(this);
97 calling_init_ = false;
98 }
99
100 void MessagePipeDispatcher::CloseOnIO() {
101 base::AutoLock locker(lock());
102
103 // TODO(jam) CLEANUP! this should be done inside RawChannel.....
104 if (channel_) {
105 channel_->Shutdown();
106 channel_ = nullptr;
107 }
108 }
109
110 Dispatcher::Type MessagePipeDispatcher::GetType() const {
111 return Type::MESSAGE_PIPE;
112 }
113
114
115
116 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
117 // best way we want to share this. Need to also consider posix which does
118 // require access to the RawChannel.
119 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
120 size_t num_platform_handles,
121 const void* platform_handle_table) {
122 // TODO(jam): this code will have to be updated once it's used in a sandbox
123 // and the receiving process doesn't have duplicate permission for the
124 // receiver. Once there's a broker and we have a connection to it (possibly
125 // through ConnectionManager), then we can make a sync IPC to it here to get a
126 // token for this handle, and it will duplicate the handle to is process. Then
127 // we pass the token to the receiver, which will then make a sync call to the
128 // broker to get a duplicated handle. This will also allow us to avoid leaks
129 // of the handle if the receiver dies, since the broker can notice that.
130 DCHECK_GT(num_platform_handles, 0u);
131 ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector());
132
133 #if defined(OS_WIN)
134 const char* serialization_data =
135 static_cast<const char*>(platform_handle_table);
136 for (size_t i = 0; i < num_platform_handles; i++) {
137 DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data);
138 serialization_data += sizeof(DWORD);
139 HANDLE source_handle = *reinterpret_cast<const HANDLE*>(serialization_data);
140 serialization_data += sizeof(HANDLE);
141 base::Process sender =
142 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
143 DCHECK(sender.IsValid());
144 HANDLE target_handle = NULL;
145 BOOL dup_result =
146 DuplicateHandle(sender.Handle(), source_handle,
147 base::GetCurrentProcessHandle(), &target_handle, 0,
148 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
149 DCHECK(dup_result);
150 rv->push_back(PlatformHandle(target_handle));
151 }
152 #else
153 NOTREACHED() << "TODO(jam): implement";
154 #endif
155 return rv.Pass();
156 }
157
158 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
159 const void* source,
160 size_t size,
161 PlatformHandleVector* platform_handles) {
162 const SerializedMessagePipeHandleDispatcher* serialization =
163 static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
164 size_t platform_handle_index = serialization->platform_handle_index;
165
166
167 // Starts off invalid, which is what we want.
168 PlatformHandle platform_handle;
169
170 if (platform_handle_index != kInvalidMessagePipeHandleIndex) {
171 if (!platform_handles ||
172 platform_handle_index >= platform_handles->size()) {
173 LOG(ERROR)
174 << "Invalid serialized platform handle dispatcher (missing handles)";
175 return nullptr;
176 }
177
178 // We take ownership of the handle, so we have to invalidate the one in
179 // |platform_handles|.
180 std::swap(platform_handle, (*platform_handles)[platform_handle_index]);
181 }
182
183
184 // TODO(jam): temporary until we send message_queue_ via shared memory.
185 size -= sizeof(SerializedMessagePipeHandleDispatcher);
186 const char* messages = static_cast<const char*>(source);
187 messages += sizeof(SerializedMessagePipeHandleDispatcher);
188
189
190 char* initial_read_data = nullptr;
191 size_t initial_read_size = 0;
192
193 if (serialization->read_buffer_size) {
194 initial_read_data = const_cast<char*>(messages);
195 initial_read_size = serialization->read_buffer_size;
196
197 messages += initial_read_size;
198 size -= initial_read_size;
199 }
200
201 scoped_refptr<MessagePipeDispatcher> rv(
202 Create(MessagePipeDispatcher::kDefaultCreateOptions));
203 rv->InitWithReadBuffer(
204 ScopedPlatformHandle(platform_handle),
205 initial_read_data, initial_read_size);
206
207 while (size) {
208 size_t message_size;
209 CHECK(MessageInTransit::GetNextMessageSize(
210 messages, size, &message_size));
211 MessageInTransit::View message_view(message_size, messages);
212 size -= message_size;
213 messages += message_size;
214
215 // copied from RawChannel::OnReadCompleted
216 // TODO(jam): don't copy
217 ScopedPlatformHandleVectorPtr platform_handles;
218 if (message_view.transport_data_buffer()) {
219 size_t num_platform_handles;
220 const void* platform_handle_table;
221 TransportData::GetPlatformHandleTable(
222 message_view.transport_data_buffer(), &num_platform_handles,
223 &platform_handle_table);
224
225 if (num_platform_handles > 0) {
226 platform_handles =
227 GetReadPlatformHandles(num_platform_handles,
228 platform_handle_table).Pass();
229 if (!platform_handles) {
230 LOG(ERROR) << "Invalid number of platform handles received";
231 return nullptr;
232 }
233 }
234 }
235
236
237 // copied below from OnReadMessage
238 // TODO(jam): don't copy
239 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
240 if (message_view.transport_data_buffer_size() > 0) {
241 DCHECK(message_view.transport_data_buffer());
242 message->SetDispatchers(TransportData::DeserializeDispatchers(
243 message_view.transport_data_buffer(),
244 message_view.transport_data_buffer_size(), platform_handles.Pass()));
245 }
246
247 rv->message_queue_.AddMessage(message.Pass());
248 }
249
250 return rv;
251 }
252
253 MessagePipeDispatcher::MessagePipeDispatcher()
254 : channel_(nullptr),
255 serialized_(false),
256 calling_init_(false),
257 error_(false) {
258 }
259
260 MessagePipeDispatcher::~MessagePipeDispatcher() {
261 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
262 DCHECK(!channel_);
263 }
264
265 void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
266 lock().AssertAcquired();
267 awakable_list_.CancelAll();
268 }
269
270 void MessagePipeDispatcher::CloseImplNoLock() {
271 lock().AssertAcquired();
272 if (g_use_channel_on_io) {
273 internal::g_io_thread_task_runner->PostTask(
274 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
275 } else {
276 CloseOnIO();
277 }
278 }
279
280 void MessagePipeDispatcher::SerializeInternal() {
281 // need to stop watching handle immediately, even tho not on IO thread, so
282 // that other messages aren't read after this.
283 {
284 if (channel_) {
285 serialized_platform_handle_ =
286 channel_->ReleaseHandle(&serialized_read_buffer_).release();
287
288 channel_ = nullptr;
289 } else {
290 // It's valid that the other side wrote some data and closed its end.
291 }
292 }
293
294 DCHECK(serialized_message_queue_.empty());
295 // see comment in method below, this is only temporary till we implement a
296 // solution with shared buffer
297 while (!message_queue_.IsEmpty()) {
298 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
299 size_t cur_size = serialized_message_queue_.size();
300
301
302 // When MojoWriteMessage is called, the MessageInTransit doesn't have
303 // dispatchers set and CreateEquivaent... is called since the dispatchers
304 // can be referenced by others. here dispatchers aren't referenced by
305 // others, but rawchannel can still call to them. so since we dont call
306 // createequiv, manually call TransportStarted and TransportEnd.
307 DispatcherVector dispatchers;
308 if (message->has_dispatchers())
309 dispatchers = *message->dispatchers();
310 for (size_t i = 0; i < dispatchers.size(); ++i)
311 dispatchers[i]->TransportStarted();
312
313 //TODO(jam): this handling for dispatchers only works on windows where we
314 //send transportdata as bytes instead of as parameters to sendmsg.
315 message->SerializeAndCloseDispatchers();
316 // cont'd below
317
318
319 size_t main_buffer_size = message->main_buffer_size();
320 size_t transport_data_buffer_size = message->transport_data() ?
321 message->transport_data()->buffer_size() : 0;
322 size_t total_size = message->total_size();
323
324
325
326 serialized_message_queue_.resize(cur_size + total_size);
327 memcpy(&serialized_message_queue_[cur_size], message->main_buffer(),
328 main_buffer_size);
329
330 // cont'd
331 if (transport_data_buffer_size != 0) {
332 #if defined(OS_WIN)
333 // TODO(jam): copied from RawChannelWin::WriteNoLock(
334 if (channel_->GetSerializedPlatformHandleSize()) {
335 char* serialization_data =
336 static_cast<char*>(message->transport_data()->buffer()) +
337 message->transport_data()->platform_handle_table_offset();
338 PlatformHandleVector* all_platform_handles =
339 message->transport_data()->platform_handles();
340 if (all_platform_handles) {
341 DWORD current_process_id = base::GetCurrentProcId();
342 for (size_t i = 0; i < all_platform_handles->size(); i++) {
343 *reinterpret_cast<DWORD*>(serialization_data) = current_process_id;
344 serialization_data += sizeof(DWORD);
345 *reinterpret_cast<HANDLE*>(serialization_data) =
346 all_platform_handles->at(i).handle;
347 serialization_data += sizeof(HANDLE);
348 all_platform_handles->at(i) = PlatformHandle();
349 }
350 }
351 }
352
353 memcpy(&serialized_message_queue_[
354 cur_size + total_size - transport_data_buffer_size],
355 message->transport_data()->buffer(), transport_data_buffer_size);
356 #else
357 NOTREACHED() << "TODO(jam) implement";
358 #endif
359 }
360
361 for (size_t i = 0; i < dispatchers.size(); ++i)
362 dispatchers[i]->TransportEnded();
363 }
364
365 serialized_ = true;
366 }
367
368 scoped_refptr<Dispatcher>
369 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
370 lock().AssertAcquired();
371
372 SerializeInternal();
373
374 // TODO(vtl): Currently, there are no options, so we just use
375 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
376 // too.
377 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
378 rv->channel_ = channel_;
379 channel_ = nullptr;
380
381
382 rv->serialized_platform_handle_ = serialized_platform_handle_;
383 serialized_platform_handle_ = PlatformHandle();
384 serialized_message_queue_.swap(rv->serialized_message_queue_);
385 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
386 rv->serialized_ = true;
387 return scoped_refptr<Dispatcher>(rv.get());
388 }
389
390 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
391 UserPointer<const void> bytes,
392 uint32_t num_bytes,
393 std::vector<DispatcherTransport>* transports,
394 MojoWriteMessageFlags flags) {
395
396 DCHECK(!transports ||
397 (transports->size() > 0 &&
398 transports->size() <= GetConfiguration().max_message_num_handles));
399
400 lock().AssertAcquired();
401
402 if (!channel_) {
403 DCHECK(error_);
404 return MOJO_RESULT_FAILED_PRECONDITION;
405 }
406
407 if (num_bytes > GetConfiguration().max_message_num_bytes)
408 return MOJO_RESULT_RESOURCE_EXHAUSTED;
409 scoped_ptr<MessageInTransit> message(new MessageInTransit(
410 MessageInTransit::Type::MESSAGE, num_bytes, bytes));
411 if (transports) {
412 MojoResult result = AttachTransportsNoLock(message.get(), transports);
413 if (result != MOJO_RESULT_OK)
414 return result;
415 }
416
417 // TODO(jam): pass in GetSerializedPlatformHandleSize instead of RawChannel
418 message->SerializeAndCloseDispatchers();
419 channel_->WriteMessage(message.Pass());
420
421 return MOJO_RESULT_OK;
422 }
423
424 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
425 UserPointer<void> bytes,
426 UserPointer<uint32_t> num_bytes,
427 DispatcherVector* dispatchers,
428 uint32_t* num_dispatchers,
429 MojoReadMessageFlags flags) {
430 lock().AssertAcquired();
431 // return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers,
432 // num_dispatchers, flags);
433
434 DCHECK(!dispatchers || dispatchers->empty());
435
436 const uint32_t max_bytes = num_bytes.IsNull() ? 0 : num_bytes.Get();
437 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
438
439 if (message_queue_.IsEmpty()) {
440 return error_ ? MOJO_RESULT_FAILED_PRECONDITION
441 : MOJO_RESULT_SHOULD_WAIT;
442 }
443
444 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
445 // and release the lock immediately.
446 bool enough_space = true;
447 MessageInTransit* message = message_queue_.PeekMessage();
448 if (!num_bytes.IsNull())
449 num_bytes.Put(message->num_bytes());
450 if (message->num_bytes() <= max_bytes)
451 bytes.PutArray(message->bytes(), message->num_bytes());
452 else
453 enough_space = false;
454
455 if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
456 if (num_dispatchers)
457 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
458 if (enough_space) {
459 if (queued_dispatchers->empty()) {
460 // Nothing to do.
461 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
462 DCHECK(dispatchers);
463 dispatchers->swap(*queued_dispatchers);
464 } else {
465 enough_space = false;
466 }
467 }
468 } else {
469 if (num_dispatchers)
470 *num_dispatchers = 0;
471 }
472
473 message = nullptr;
474
475 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
476 message_queue_.DiscardMessage();
477
478 // Now it's empty, thus no longer readable.
479 if (message_queue_.IsEmpty()) {
480 // It's currently not possible to wait for non-readability, but we should
481 // do the state change anyway.
482 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
483 }
484 }
485
486 if (!enough_space)
487 return MOJO_RESULT_RESOURCE_EXHAUSTED;
488
489 return MOJO_RESULT_OK;
490 }
491
492 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
493 const {
494 lock().AssertAcquired();
495 // return message_pipe_->GetHandleSignalsState(port_);
496
497 HandleSignalsState rv;
498 if (!message_queue_.IsEmpty()) {
499 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
500 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
501 }
502 if (!error_) {
503 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
504 rv.satisfiable_signals |=
505 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
506 } else {
507 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
508 }
509 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
510 return rv;
511 }
512
513 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
514 Awakable* awakable,
515 MojoHandleSignals signals,
516 uint32_t context,
517 HandleSignalsState* signals_state) {
518 lock().AssertAcquired();
519 // return message_pipe_->AddAwakable(port_, awakable, signals, context,
520 // signals_state);
521 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
522 if (state.satisfies(signals)) {
523 if (signals_state)
524 *signals_state = state;
525 return MOJO_RESULT_ALREADY_EXISTS;
526 }
527 if (!state.can_satisfy(signals)) {
528 if (signals_state)
529 *signals_state = state;
530 return MOJO_RESULT_FAILED_PRECONDITION;
531 }
532
533 awakable_list_.Add(awakable, signals, context);
534 return MOJO_RESULT_OK;
535 }
536
537 void MessagePipeDispatcher::RemoveAwakableImplNoLock(
538 Awakable* awakable,
539 HandleSignalsState* signals_state) {
540 lock().AssertAcquired();
541
542 awakable_list_.Remove(awakable);
543 if (signals_state)
544 *signals_state = GetHandleSignalsStateImplNoLock();
545 }
546
547 void MessagePipeDispatcher::StartSerializeImplNoLock(
548 size_t* max_size,
549 size_t* max_platform_handles) {
550 // see comment in dispatcher::startserialize
551 // DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
552
553 if (!serialized_) {
554 // handles the case where we have messages read off rawchannel but not
555 // ready by MojoReadMessage.
556 SerializeInternal();
557 }
558
559 *max_platform_handles = serialized_platform_handle_.is_valid() ? 1 : 0;
560
561 DCHECK_EQ(serialized_message_queue_.size() %
562 MessageInTransit::kMessageAlignment, 0U);
563 *max_size = sizeof(SerializedMessagePipeHandleDispatcher) +
564 serialized_message_queue_.size() +
565 serialized_read_buffer_.size();
566
567 DCHECK_LE(*max_size, TransportData::kMaxSerializedDispatcherSize);
568 }
569
570 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
571 void* destination,
572 size_t* actual_size,
573 PlatformHandleVector* platform_handles) {
574 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
575
576 CloseImplNoLock();
577 SerializedMessagePipeHandleDispatcher* serialization =
578 static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
579 if (serialized_platform_handle_.is_valid()) {
580 serialization->platform_handle_index = platform_handles->size();
581 platform_handles->push_back(serialized_platform_handle_);
582 } else {
583 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
584 }
585 serialization->read_buffer_size = serialized_read_buffer_.size();
586
587 char* destination_char = static_cast<char*>(destination);
588 destination_char += sizeof(SerializedMessagePipeHandleDispatcher);
589
590 if (!serialized_read_buffer_.empty()) {
591 memcpy(destination_char, &serialized_read_buffer_[0],
592 serialized_read_buffer_.size());
593 destination_char += serialized_read_buffer_.size();
594 }
595
596
597 if (!serialized_message_queue_.empty()) {
598 memcpy(destination_char,
599 &serialized_message_queue_[0],
600 serialized_message_queue_.size());
601 }
602
603 *actual_size =
604 sizeof(SerializedMessagePipeHandleDispatcher) +
605 serialized_message_queue_.size() +
606 serialized_read_buffer_.size();
607
608 return true;
609 }
610
611 void MessagePipeDispatcher::TransportStarted() {
612 started_transport_.Acquire();
613 }
614
615 void MessagePipeDispatcher::TransportEnded() {
616 started_transport_.Release();
617
618 base::AutoLock locker(lock());
619
620 // If transporting of MPD failed, we might have got more data and didn't
621 // awake for.
622 // TODO(jam): should we care about only alerting if it was empty before
623 // TransportStarted?
624 if (!message_queue_.IsEmpty())
625 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
626 }
627
628 void MessagePipeDispatcher::OnReadMessage(
629 const MessageInTransit::View& message_view,
630 ScopedPlatformHandleVectorPtr platform_handles) {
631 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
632 if (message_view.transport_data_buffer_size() > 0) {
633 DCHECK(message_view.transport_data_buffer());
634 message->SetDispatchers(TransportData::DeserializeDispatchers(
635 message_view.transport_data_buffer(),
636 message_view.transport_data_buffer_size(), platform_handles.Pass()));
637 }
638
639 if (started_transport_.Try()) {
640 // we're not in the middle of being sent
641
642 // Can get synchronously called back in Init if there was initial data.
643 scoped_ptr<base::AutoLock> locker;
644 if (!calling_init_) {
645 locker.reset(new base::AutoLock(lock()));
646 }
647
648 bool was_empty = message_queue_.IsEmpty();
649 message_queue_.AddMessage(message.Pass());
650 if (was_empty)
651 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
652
653 started_transport_.Release();
654 } else {
655
656 // if RawChannel is calling OnRead, that means it has its read_lock_
657 // acquired. that means StartSerialize can't be accessing message queue as
658 // it waits on releasehandle first which acquires readlock_!
659 message_queue_.AddMessage(message.Pass());
660 }
661 }
662
663 void MessagePipeDispatcher::OnError(Error error) {
664 switch (error) {
665 case ERROR_READ_SHUTDOWN:
666 // The other side was cleanly closed, so this isn't actually an error.
667 DVLOG(1) << "MessagePipeDispatcher read error (shutdown)";
668 break;
669 case ERROR_READ_BROKEN:
670 LOG(ERROR) << "MessagePipeDispatcher read error (connection broken)";
671 break;
672 case ERROR_READ_BAD_MESSAGE:
673 // Receiving a bad message means either a bug, data corruption, or
674 // malicious attack (probably due to some other bug).
675 LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)";
676 break;
677 case ERROR_READ_UNKNOWN:
678 LOG(ERROR) << "MessagePipeDispatcher read error (unknown)";
679 break;
680 case ERROR_WRITE:
681 // Write errors are slightly notable: they probably shouldn't happen under
682 // normal operation (but maybe the other side crashed).
683 LOG(WARNING) << "MessagePipeDispatcher write error";
684 break;
685 }
686
687 error_ = true;
688 if (started_transport_.Try()) {
689 base::AutoLock locker(lock());
690 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
691
692 base::MessageLoop::current()->PostTask(
693 FROM_HERE,
694 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
695 channel_ = nullptr;
696 started_transport_.Release();
697 } else {
698 // We must be waiting to call ReleaseHandle. It will call Shutdown.
699 }
700 }
701
702 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
703 MessageInTransit* message,
704 std::vector<DispatcherTransport>* transports) {
705 DCHECK(!message->has_dispatchers());
706
707 // You're not allowed to send either handle to a message pipe over the message
708 // pipe, so check for this. (The case of trying to write a handle to itself is
709 // taken care of by |Core|. That case kind of makes sense, but leads to
710 // complications if, e.g., both sides try to do the same thing with their
711 // respective handles simultaneously. The other case, of trying to write the
712 // peer handle to a handle, doesn't make sense -- since no handle will be
713 // available to read the message from.)
714 for (size_t i = 0; i < transports->size(); i++) {
715 if (!(*transports)[i].is_valid())
716 continue;
717 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
718 MessagePipeDispatcher* mp =
719 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
720 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
721 // The other case should have been disallowed by |Core|. (Note: |port|
722 // is the peer port of the handle given to |WriteMessage()|.)
723 return MOJO_RESULT_INVALID_ARGUMENT;
724 }
725 }
726 }
727
728 // Clone the dispatchers and attach them to the message. (This must be done as
729 // a separate loop, since we want to leave the dispatchers alone on failure.)
730 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
731 dispatchers->reserve(transports->size());
732 for (size_t i = 0; i < transports->size(); i++) {
733 if ((*transports)[i].is_valid()) {
734 dispatchers->push_back(
735 (*transports)[i].CreateEquivalentDispatcherAndClose());
736 } else {
737 LOG(WARNING) << "Enqueueing null dispatcher";
738 dispatchers->push_back(nullptr);
739 }
740 }
741 message->SetDispatchers(dispatchers.Pass());
742 return MOJO_RESULT_OK;
743 }
744
745 } // namespace edk
746 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698