Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(778)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/system/configuration.h"
12 #include "mojo/edk/system/memory.h"
13 #include "mojo/edk/system/message_in_transit.h"
14 #include "mojo/edk/system/options_validation.h"
15 #include "mojo/edk/system/transport_data.h"
16
17 // TODO(jam): do more tests on using channel on same thread if it supports it (
18 // i.e. with USE_CHROME_EDK and Windows). Also see ipc_channel_mojo.cc
19 bool g_use_channel_on_io = true;
20
21 namespace mojo {
22 namespace system {
23
24 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
25
26 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
27 size_t platform_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
28 size_t read_buffer_size; // any bytes after this are serialized messages
29 };
30
31 // MessagePipeDispatcher -------------------------------------------------------
32
33 const MojoCreateMessagePipeOptions
34 MessagePipeDispatcher::kDefaultCreateOptions = {
35 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
36 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
37
38 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
39 UserPointer<const MojoCreateMessagePipeOptions> in_options,
40 MojoCreateMessagePipeOptions* out_options) {
41 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
42 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
43
44 *out_options = kDefaultCreateOptions;
45 if (in_options.IsNull())
46 return MOJO_RESULT_OK;
47
48 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
49 if (!reader.is_valid())
50 return MOJO_RESULT_INVALID_ARGUMENT;
51
52 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
53 return MOJO_RESULT_OK;
54 if ((reader.options().flags & ~kKnownFlags))
55 return MOJO_RESULT_UNIMPLEMENTED;
56 out_options->flags = reader.options().flags;
57
58 // Checks for fields beyond |flags|:
59
60 // (Nothing here yet.)
61
62 return MOJO_RESULT_OK;
63 }
64
65 void MessagePipeDispatcher::Init(embedder::ScopedPlatformHandle message_pipe) {
66 InitWithReadBuffer(message_pipe.Pass(), nullptr, 0);
67 }
68
69 void MessagePipeDispatcher::InitWithReadBuffer(
70 embedder::ScopedPlatformHandle message_pipe,
71 char* data,
72 size_t size) {
73 if (message_pipe.get().is_valid()) {
74 channel_ = RawChannel::Create(message_pipe.Pass());
75
76
77
78
79 // TODO(jam): pass this in Init call....
80 if (size)
81 channel_->SetInitialReadBufferData(data, size);
82 if (g_use_channel_on_io) {
83 mojo::embedder::internal::g_io_thread_task_runner->PostTask(
84 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
85 } else {
86 InitOnIO();
87 }
88 // TODO(jam): optimize for when running on IO thread
89 }
90 }
91
92 void MessagePipeDispatcher::InitOnIO() {
93 base::AutoLock locker(lock());
94 calling_init_ = true;
95 if (channel_)
96 channel_->Init(this);
97 calling_init_ = false;
98 }
99
100 void MessagePipeDispatcher::CloseOnIO() {
101 base::AutoLock locker(lock());
102
103 // TODO(jam) CLEANUP! this should be done inside RawChannel.....
104 if (channel_) {
105 channel_->Shutdown();
106 channel_ = nullptr;
107 }
108 }
109
110 Dispatcher::Type MessagePipeDispatcher::GetType() const {
111 return Type::MESSAGE_PIPE;
112 }
113
114
115
116 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
117 // best way we want to share this. Need to also consider posix which does
118 // require access to the RawChannel.
119 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
120 size_t num_platform_handles,
121 const void* platform_handle_table) {
122 // TODO(jam): this code will have to be updated once it's used in a sandbox
123 // and the receiving process doesn't have duplicate permission for the
124 // receiver. Once there's a broker and we have a connection to it (possibly
125 // through ConnectionManager), then we can make a sync IPC to it here to get a
126 // token for this handle, and it will duplicate the handle to is process. Then
127 // we pass the token to the receiver, which will then make a sync call to the
128 // broker to get a duplicated handle. This will also allow us to avoid leaks
129 // of the handle if the receiver dies, since the broker can notice that.
yzshen1 2015/09/23 22:47:08 (Just wonder whether you've come up with some new
130 DCHECK_GT(num_platform_handles, 0u);
131 embedder::ScopedPlatformHandleVectorPtr rv(
132 new embedder::PlatformHandleVector());
133
134 #if defined(OS_WIN)
135 const char* serialization_data =
136 static_cast<const char*>(platform_handle_table);
137 for (size_t i = 0; i < num_platform_handles; i++) {
138 DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data);
139 serialization_data += sizeof(DWORD);
140 HANDLE source_handle = *reinterpret_cast<const HANDLE*>(serialization_data);
141 serialization_data += sizeof(HANDLE);
142 base::Process sender =
143 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
144 DCHECK(sender.IsValid());
145 HANDLE target_handle = NULL;
146 BOOL dup_result =
147 DuplicateHandle(sender.Handle(), source_handle,
148 base::GetCurrentProcessHandle(), &target_handle, 0,
149 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
150 DCHECK(dup_result);
151 rv->push_back(embedder::PlatformHandle(target_handle));
152 }
153 #else
154 NOTREACHED() << "TODO(jam): implement";
155 #endif
156 return rv.Pass();
157 }
158
159 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
160 const void* source,
161 size_t size,
162 embedder::PlatformHandleVector* platform_handles) {
163 const SerializedMessagePipeHandleDispatcher* serialization =
164 static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
165 size_t platform_handle_index = serialization->platform_handle_index;
166
167
168 // Starts off invalid, which is what we want.
169 embedder::PlatformHandle platform_handle;
170
171 if (platform_handle_index != kInvalidMessagePipeHandleIndex) {
172 if (!platform_handles ||
173 platform_handle_index >= platform_handles->size()) {
174 LOG(ERROR)
175 << "Invalid serialized platform handle dispatcher (missing handles)";
176 return nullptr;
177 }
178
179 // We take ownership of the handle, so we have to invalidate the one in
180 // |platform_handles|.
181 std::swap(platform_handle, (*platform_handles)[platform_handle_index]);
182 }
183
184
185 // TODO(jam): temporary until we send message_queue_ via shared memory.
186 size -= sizeof(SerializedMessagePipeHandleDispatcher);
187 const char* messages = static_cast<const char*>(source);
188 messages += sizeof(SerializedMessagePipeHandleDispatcher);
189
190
191 char* initial_read_data = nullptr;
192 size_t initial_read_size = 0;
193
194 if (serialization->read_buffer_size) {
195 initial_read_data = const_cast<char*>(messages);
196 initial_read_size = serialization->read_buffer_size;
197
198 messages += initial_read_size;
199 size -= initial_read_size;
200 }
201
202 scoped_refptr<MessagePipeDispatcher> rv(
203 Create(MessagePipeDispatcher::kDefaultCreateOptions));
204 rv->InitWithReadBuffer(
205 embedder::ScopedPlatformHandle(platform_handle),
206 initial_read_data, initial_read_size);
207
208 while (size) {
209 size_t message_size;
210 CHECK(MessageInTransit::GetNextMessageSize(
211 messages, size, &message_size));
212 MessageInTransit::View message_view(message_size, messages);
213 size -= message_size;
214 messages += message_size;
215
216 // copied from RawChannel::OnReadCompleted
217 // TODO(jam): don't copy
218 embedder::ScopedPlatformHandleVectorPtr platform_handles;
219 if (message_view.transport_data_buffer()) {
220 size_t num_platform_handles;
221 const void* platform_handle_table;
222 TransportData::GetPlatformHandleTable(
223 message_view.transport_data_buffer(), &num_platform_handles,
224 &platform_handle_table);
225
226 if (num_platform_handles > 0) {
227 platform_handles =
228 GetReadPlatformHandles(num_platform_handles,
229 platform_handle_table).Pass();
230 if (!platform_handles) {
231 LOG(ERROR) << "Invalid number of platform handles received";
232 return nullptr;
233 }
234 }
235 }
236
237
238 // copied below from OnReadMessage
239 // TODO(jam): don't copy
240 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
241 if (message_view.transport_data_buffer_size() > 0) {
242 DCHECK(message_view.transport_data_buffer());
243 message->SetDispatchers(TransportData::DeserializeDispatchers(
244 message_view.transport_data_buffer(),
245 message_view.transport_data_buffer_size(), platform_handles.Pass()));
246 }
247
248 rv->message_queue_.AddMessage(message.Pass());
249 }
250
251 return rv;
252 }
253
254 MessagePipeDispatcher::MessagePipeDispatcher()
255 : channel_(nullptr),
256 serialized_(false),
257 calling_init_(false),
258 error_(false) {
259 }
260
261 MessagePipeDispatcher::~MessagePipeDispatcher() {
262 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
263 DCHECK(!channel_);
264 }
265
266 void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
267 lock().AssertAcquired();
268 awakable_list_.CancelAll();
269 }
270
271 void MessagePipeDispatcher::CloseImplNoLock() {
272 lock().AssertAcquired();
273 if (g_use_channel_on_io) {
274 mojo::embedder::internal::g_io_thread_task_runner->PostTask(
275 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
276 } else {
277 CloseOnIO();
278 }
279 }
280
281 void MessagePipeDispatcher::SerializeInternal() {
282 // need to stop watching handle immediately, even tho not on IO thread, so
283 // that other messages aren't read after this.
284 {
285 if (channel_) {
286 serialized_platform_handle_ =
287 channel_->ReleaseHandle(&serialized_read_buffer_).release();
288
289 channel_ = nullptr;
yzshen1 2015/09/23 22:47:08 This seems unintended because some places expect t
290 } else {
291 // It's valid that the other side wrote some data and closed its end.
292 }
293 }
294
295 DCHECK(serialized_message_queue_.empty());
296 // see comment in method below, this is only temporary till we implement a
297 // solution with shared buffer
298 while (!message_queue_.IsEmpty()) {
299 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
300 size_t cur_size = serialized_message_queue_.size();
301
302
303 // When MojoWriteMessage is called, the MessageInTransit doesn't have
304 // dispatchers set and CreateEquivaent... is called since the dispatchers
305 // can be referenced by others. here dispatchers aren't referenced by
306 // others, but rawchannel can still call to them. so since we dont call
307 // createequiv, manually call TransportStarted and TransportEnd.
308 DispatcherVector dispatchers;
309 if (message->has_dispatchers())
310 dispatchers = *message->dispatchers();
311 for (size_t i = 0; i < dispatchers.size(); ++i)
312 dispatchers[i]->TransportStarted();
313
314 //TODO(jam): this handling for dispatchers only works on windows where we
315 //send transportdata as bytes instead of as parameters to sendmsg.
316 message->SerializeAndCloseDispatchers();
317 // cont'd below
318
319
320 size_t main_buffer_size = message->main_buffer_size();
321 size_t transport_data_buffer_size = message->transport_data() ?
322 message->transport_data()->buffer_size() : 0;
323 size_t total_size = message->total_size();
324
325
326
327 serialized_message_queue_.resize(cur_size + total_size);
328 memcpy(&serialized_message_queue_[cur_size], message->main_buffer(),
329 main_buffer_size);
330
331 // cont'd
332 if (transport_data_buffer_size != 0) {
333 #if defined(OS_WIN)
334 // TODO(jam): copied from RawChannelWin::WriteNoLock(
335 if (channel_->GetSerializedPlatformHandleSize()) {
yzshen1 2015/09/23 22:47:08 channel_ has been set to null on line 289?
336 char* serialization_data =
337 static_cast<char*>(message->transport_data()->buffer()) +
338 message->transport_data()->platform_handle_table_offset();
339 embedder::PlatformHandleVector* all_platform_handles =
340 message->transport_data()->platform_handles();
341 if (all_platform_handles) {
342 DWORD current_process_id = base::GetCurrentProcId();
343 for (size_t i = 0; i < all_platform_handles->size(); i++) {
344 *reinterpret_cast<DWORD*>(serialization_data) = current_process_id;
345 serialization_data += sizeof(DWORD);
346 *reinterpret_cast<HANDLE*>(serialization_data) =
347 all_platform_handles->at(i).handle;
348 serialization_data += sizeof(HANDLE);
349 all_platform_handles->at(i) = embedder::PlatformHandle();
350 }
351 }
352 }
353
354 memcpy(&serialized_message_queue_[
355 cur_size + total_size - transport_data_buffer_size],
356 message->transport_data()->buffer(), transport_data_buffer_size);
357 #else
358 NOTREACHED() << "TODO(jam) implement";
359 #endif
360 }
361
362 for (size_t i = 0; i < dispatchers.size(); ++i)
363 dispatchers[i]->TransportEnded();
364 }
365
366 serialized_ = true;
367 }
368
369 scoped_refptr<Dispatcher>
370 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
371 lock().AssertAcquired();
372
373 SerializeInternal();
374
375 // TODO(vtl): Currently, there are no options, so we just use
376 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
377 // too.
378 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
379 rv->channel_ = channel_;
yzshen1 2015/09/23 22:47:08 |channel_| has been set to nullptr in SerializeInt
380 channel_ = nullptr;
381
382
383 rv->serialized_platform_handle_ = serialized_platform_handle_;
384 serialized_platform_handle_ = mojo::embedder::PlatformHandle();
385 serialized_message_queue_.swap(rv->serialized_message_queue_);
386 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
387 rv->serialized_ = true;
388 return scoped_refptr<Dispatcher>(rv.get());
389 }
390
391 MojoResult AttachTransportsNoLock(
392 MessageInTransit* message,
393 std::vector<DispatcherTransport>* transports) {
394 DCHECK(!message->has_dispatchers());
395 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
396 dispatchers->reserve(transports->size());
397 for (size_t i = 0; i < transports->size(); i++) {
398 if ((*transports)[i].is_valid()) {
399 dispatchers->push_back(
400 (*transports)[i].CreateEquivalentDispatcherAndClose());
401 } else {
402 LOG(WARNING) << "Enqueueing null dispatcher";
403 dispatchers->push_back(nullptr);
404 }
405 }
406 message->SetDispatchers(dispatchers.Pass());
407 return MOJO_RESULT_OK;
408 }
409
410
411 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
412 UserPointer<const void> bytes,
413 uint32_t num_bytes,
414 std::vector<DispatcherTransport>* transports,
415 MojoWriteMessageFlags flags) {
416
417 DCHECK(!transports ||
418 (transports->size() > 0 &&
419 transports->size() <= GetConfiguration().max_message_num_handles));
420
421 lock().AssertAcquired();
422
423 if (!channel_) {
424 DCHECK(error_);
425 return MOJO_RESULT_FAILED_PRECONDITION;
426 }
427
428 if (num_bytes > GetConfiguration().max_message_num_bytes)
429 return MOJO_RESULT_RESOURCE_EXHAUSTED;
430 scoped_ptr<MessageInTransit> message(new MessageInTransit(
431 MessageInTransit::Type::MESSAGE, num_bytes, bytes));
432 if (transports) {
433 MojoResult result = AttachTransportsNoLock(message.get(), transports);
434 if (result != MOJO_RESULT_OK)
435 return result;
436 }
437
438 // TODO(jam): pass in GetSerializedPlatformHandleSize instead of RawChannel
439 message->SerializeAndCloseDispatchers();
440 channel_->WriteMessage(message.Pass());
441
442 return MOJO_RESULT_OK;
443 }
444
445 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
446 UserPointer<void> bytes,
447 UserPointer<uint32_t> num_bytes,
448 DispatcherVector* dispatchers,
449 uint32_t* num_dispatchers,
450 MojoReadMessageFlags flags) {
451 lock().AssertAcquired();
452 // return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers,
453 // num_dispatchers, flags);
454
455 DCHECK(!dispatchers || dispatchers->empty());
456
457 const uint32_t max_bytes = num_bytes.IsNull() ? 0 : num_bytes.Get();
458 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
459
460 if (message_queue_.IsEmpty()) {
461 return error_ ? MOJO_RESULT_FAILED_PRECONDITION
462 : MOJO_RESULT_SHOULD_WAIT;
463 }
464
465 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
466 // and release the lock immediately.
467 bool enough_space = true;
468 MessageInTransit* message = message_queue_.PeekMessage();
469 if (!num_bytes.IsNull())
470 num_bytes.Put(message->num_bytes());
471 if (message->num_bytes() <= max_bytes)
472 bytes.PutArray(message->bytes(), message->num_bytes());
473 else
474 enough_space = false;
475
476 if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
477 if (num_dispatchers)
478 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
479 if (enough_space) {
480 if (queued_dispatchers->empty()) {
481 // Nothing to do.
482 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
483 DCHECK(dispatchers);
484 dispatchers->swap(*queued_dispatchers);
485 } else {
486 enough_space = false;
487 }
488 }
489 } else {
490 if (num_dispatchers)
491 *num_dispatchers = 0;
492 }
493
494 message = nullptr;
495
496 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
497 message_queue_.DiscardMessage();
498
499 // Now it's empty, thus no longer readable.
500 if (message_queue_.IsEmpty()) {
501 // It's currently not possible to wait for non-readability, but we should
502 // do the state change anyway.
503 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
504 }
505 }
506
507 if (!enough_space)
508 return MOJO_RESULT_RESOURCE_EXHAUSTED;
509
510 return MOJO_RESULT_OK;
511 }
512
513 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
514 const {
515 lock().AssertAcquired();
516 // return message_pipe_->GetHandleSignalsState(port_);
517
518 HandleSignalsState rv;
519 if (!message_queue_.IsEmpty()) {
520 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
521 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
522 }
523 if (!error_) {
524 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
525 rv.satisfiable_signals |=
526 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
527 } else {
528 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
529 }
530 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
531 return rv;
532 }
533
534 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
535 Awakable* awakable,
536 MojoHandleSignals signals,
537 uint32_t context,
538 HandleSignalsState* signals_state) {
539 lock().AssertAcquired();
540 // return message_pipe_->AddAwakable(port_, awakable, signals, context,
541 // signals_state);
542 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
543 if (state.satisfies(signals)) {
544 if (signals_state)
545 *signals_state = state;
546 return MOJO_RESULT_ALREADY_EXISTS;
547 }
548 if (!state.can_satisfy(signals)) {
549 if (signals_state)
550 *signals_state = state;
551 return MOJO_RESULT_FAILED_PRECONDITION;
552 }
553
554 awakable_list_.Add(awakable, signals, context);
555 return MOJO_RESULT_OK;
556 }
557
558 void MessagePipeDispatcher::RemoveAwakableImplNoLock(
559 Awakable* awakable,
560 HandleSignalsState* signals_state) {
561 lock().AssertAcquired();
562
563 awakable_list_.Remove(awakable);
564 if (signals_state)
565 *signals_state = GetHandleSignalsStateImplNoLock();
566 }
567
568 void MessagePipeDispatcher::StartSerializeImplNoLock(
569 size_t* max_size,
570 size_t* max_platform_handles) {
571 // see comment in dispatcher::startserialize
572 // DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
573
574 if (!serialized_) {
575 // handles the case where we have messages read off rawchannel but not
576 // ready by MojoReadMessage.
577 SerializeInternal();
578 }
579
580 *max_platform_handles = serialized_platform_handle_.is_valid() ? 1 : 0;
581
582 DCHECK_EQ(serialized_message_queue_.size() %
583 MessageInTransit::kMessageAlignment, 0U);
584 *max_size = sizeof(SerializedMessagePipeHandleDispatcher) +
585 serialized_message_queue_.size() +
586 serialized_read_buffer_.size();
587
588 DCHECK_LE(*max_size, TransportData::kMaxSerializedDispatcherSize);
589 }
590
591 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
592 void* destination,
593 size_t* actual_size,
594 embedder::PlatformHandleVector* platform_handles) {
595 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
596
597 CloseImplNoLock();
598 SerializedMessagePipeHandleDispatcher* serialization =
599 static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
600 if (serialized_platform_handle_.is_valid()) {
601 serialization->platform_handle_index = platform_handles->size();
602 platform_handles->push_back(serialized_platform_handle_);
603 } else {
604 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
605 }
606 serialization->read_buffer_size = serialized_read_buffer_.size();
607
608 char* destination_char = static_cast<char*>(destination);
609 destination_char += sizeof(SerializedMessagePipeHandleDispatcher);
610
611 if (!serialized_read_buffer_.empty()) {
612 memcpy(destination_char, &serialized_read_buffer_[0],
613 serialized_read_buffer_.size());
614 destination_char += serialized_read_buffer_.size();
615 }
616
617
618 if (!serialized_message_queue_.empty()) {
619 memcpy(destination_char,
620 &serialized_message_queue_[0],
621 serialized_message_queue_.size());
622 }
623
624 *actual_size =
625 sizeof(SerializedMessagePipeHandleDispatcher) +
626 serialized_message_queue_.size() +
627 serialized_read_buffer_.size();
628
629 return true;
630 }
631
632 void MessagePipeDispatcher::TransportStarted() {
633 started_transport_.Acquire();
634 }
635
636 void MessagePipeDispatcher::TransportEnded() {
637 started_transport_.Release();
638
639 base::AutoLock locker(lock());
640
641 // If transporting of MPD failed, we might have got more data and didn't
642 // awake for.
643 // TODO(jam): should we care about only alerting if it was empty before
644 // TransportStarted?
645 if (!message_queue_.IsEmpty())
646 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
647 }
648
649 void MessagePipeDispatcher::OnReadMessage(
650 const MessageInTransit::View& message_view,
651 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
652 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
653 if (message_view.transport_data_buffer_size() > 0) {
654 DCHECK(message_view.transport_data_buffer());
655 message->SetDispatchers(TransportData::DeserializeDispatchers(
656 message_view.transport_data_buffer(),
657 message_view.transport_data_buffer_size(), platform_handles.Pass()));
658 }
659
660 if (started_transport_.Try()) {
661 // we're not in the middle of being sent
662
663 // Can get synchronously called back in Init if there was initial data.
664 scoped_ptr<base::AutoLock> locker;
665 if (!calling_init_) {
666 locker.reset(new base::AutoLock(lock()));
667 }
668
669 bool was_empty = message_queue_.IsEmpty();
670 message_queue_.AddMessage(message.Pass());
671 if (was_empty)
672 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
673
674 started_transport_.Release();
675 } else {
676
677 // if RawChannel is calling OnRead, that means it has its read_lock_
678 // acquired. that means StartSerialize can't be accessing message queue as
679 // it waits on releasehandle first which acquires readlock_!
680 message_queue_.AddMessage(message.Pass());
681 }
682 }
683
684 void MessagePipeDispatcher::OnError(Error error) {
685 switch (error) {
686 case ERROR_READ_SHUTDOWN:
687 // The other side was cleanly closed, so this isn't actually an error.
688 DVLOG(1) << "MessagePipeDispatcher read error (shutdown)";
689 break;
690 case ERROR_READ_BROKEN:
691 LOG(ERROR) << "MessagePipeDispatcher read error (connection broken)";
692 break;
693 case ERROR_READ_BAD_MESSAGE:
694 // Receiving a bad message means either a bug, data corruption, or
695 // malicious attack (probably due to some other bug).
696 LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)";
697 break;
698 case ERROR_READ_UNKNOWN:
699 LOG(ERROR) << "MessagePipeDispatcher read error (unknown)";
700 break;
701 case ERROR_WRITE:
702 // Write errors are slightly notable: they probably shouldn't happen under
703 // normal operation (but maybe the other side crashed).
704 LOG(WARNING) << "MessagePipeDispatcher write error";
705 break;
706 }
707
708 error_ = true;
709 if (started_transport_.Try()) {
710 base::AutoLock locker(lock());
711 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
712
713 base::MessageLoop::current()->PostTask(
714 FROM_HERE,
715 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
716 channel_ = nullptr;
717 started_transport_.Release();
718 } else {
719 // We must be waiting to call ReleaseHandle. It will call Shutdown.
720 }
721 }
722
723 } // namespace system
724 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698