Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1100)

Unified Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/message_pipe_dispatcher.cc
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
new file mode 100644
index 0000000000000000000000000000000000000000..2bdeca2f4b184316fdc098c2a35d15b89c1cf51b
--- /dev/null
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -0,0 +1,724 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/message_pipe_dispatcher.h"
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/system/configuration.h"
+#include "mojo/edk/system/memory.h"
+#include "mojo/edk/system/message_in_transit.h"
+#include "mojo/edk/system/options_validation.h"
+#include "mojo/edk/system/transport_data.h"
+
+// TODO(jam): do more tests on using channel on same thread if it supports it (
+// i.e. with USE_CHROME_EDK and Windows). Also see ipc_channel_mojo.cc
+bool g_use_channel_on_io = true;
+
+namespace mojo {
+namespace system {
+
+const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
+
+struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
+ size_t platform_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
+ size_t read_buffer_size; // any bytes after this are serialized messages
+};
+
+// MessagePipeDispatcher -------------------------------------------------------
+
+const MojoCreateMessagePipeOptions
+ MessagePipeDispatcher::kDefaultCreateOptions = {
+ static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
+
+MojoResult MessagePipeDispatcher::ValidateCreateOptions(
+ UserPointer<const MojoCreateMessagePipeOptions> in_options,
+ MojoCreateMessagePipeOptions* out_options) {
+ const MojoCreateMessagePipeOptionsFlags kKnownFlags =
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
+
+ *out_options = kDefaultCreateOptions;
+ if (in_options.IsNull())
+ return MOJO_RESULT_OK;
+
+ UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
+ if (!reader.is_valid())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
+ return MOJO_RESULT_OK;
+ if ((reader.options().flags & ~kKnownFlags))
+ return MOJO_RESULT_UNIMPLEMENTED;
+ out_options->flags = reader.options().flags;
+
+ // Checks for fields beyond |flags|:
+
+ // (Nothing here yet.)
+
+ return MOJO_RESULT_OK;
+}
+
+void MessagePipeDispatcher::Init(embedder::ScopedPlatformHandle message_pipe) {
+ InitWithReadBuffer(message_pipe.Pass(), nullptr, 0);
+}
+
+void MessagePipeDispatcher::InitWithReadBuffer(
+ embedder::ScopedPlatformHandle message_pipe,
+ char* data,
+ size_t size) {
+ if (message_pipe.get().is_valid()) {
+ channel_ = RawChannel::Create(message_pipe.Pass());
+
+
+
+
+ // TODO(jam): pass this in Init call....
+ if (size)
+ channel_->SetInitialReadBufferData(data, size);
+ if (g_use_channel_on_io) {
+ mojo::embedder::internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
+ } else {
+ InitOnIO();
+ }
+ // TODO(jam): optimize for when running on IO thread
+ }
+}
+
+void MessagePipeDispatcher::InitOnIO() {
+ base::AutoLock locker(lock());
+ calling_init_ = true;
+ if (channel_)
+ channel_->Init(this);
+ calling_init_ = false;
+}
+
+void MessagePipeDispatcher::CloseOnIO() {
+ base::AutoLock locker(lock());
+
+ // TODO(jam) CLEANUP! this should be done inside RawChannel.....
+ if (channel_) {
+ channel_->Shutdown();
+ channel_ = nullptr;
+ }
+}
+
+Dispatcher::Type MessagePipeDispatcher::GetType() const {
+ return Type::MESSAGE_PIPE;
+}
+
+
+
+// TODO(jam): this is copied from RawChannelWin till I figure out what's the
+// best way we want to share this. Need to also consider posix which does
+// require access to the RawChannel.
+embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
+ size_t num_platform_handles,
+ const void* platform_handle_table) {
+ // TODO(jam): this code will have to be updated once it's used in a sandbox
+ // and the receiving process doesn't have duplicate permission for the
+ // receiver. Once there's a broker and we have a connection to it (possibly
+ // through ConnectionManager), then we can make a sync IPC to it here to get a
+ // token for this handle, and it will duplicate the handle to is process. Then
+ // we pass the token to the receiver, which will then make a sync call to the
+ // broker to get a duplicated handle. This will also allow us to avoid leaks
+ // of the handle if the receiver dies, since the broker can notice that.
yzshen1 2015/09/23 22:47:08 (Just wonder whether you've come up with some new
+ DCHECK_GT(num_platform_handles, 0u);
+ embedder::ScopedPlatformHandleVectorPtr rv(
+ new embedder::PlatformHandleVector());
+
+#if defined(OS_WIN)
+ const char* serialization_data =
+ static_cast<const char*>(platform_handle_table);
+ for (size_t i = 0; i < num_platform_handles; i++) {
+ DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data);
+ serialization_data += sizeof(DWORD);
+ HANDLE source_handle = *reinterpret_cast<const HANDLE*>(serialization_data);
+ serialization_data += sizeof(HANDLE);
+ base::Process sender =
+ base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
+ DCHECK(sender.IsValid());
+ HANDLE target_handle = NULL;
+ BOOL dup_result =
+ DuplicateHandle(sender.Handle(), source_handle,
+ base::GetCurrentProcessHandle(), &target_handle, 0,
+ FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
+ DCHECK(dup_result);
+ rv->push_back(embedder::PlatformHandle(target_handle));
+ }
+#else
+ NOTREACHED() << "TODO(jam): implement";
+#endif
+ return rv.Pass();
+}
+
+scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
+ const void* source,
+ size_t size,
+ embedder::PlatformHandleVector* platform_handles) {
+ const SerializedMessagePipeHandleDispatcher* serialization =
+ static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
+ size_t platform_handle_index = serialization->platform_handle_index;
+
+
+ // Starts off invalid, which is what we want.
+ embedder::PlatformHandle platform_handle;
+
+ if (platform_handle_index != kInvalidMessagePipeHandleIndex) {
+ if (!platform_handles ||
+ platform_handle_index >= platform_handles->size()) {
+ LOG(ERROR)
+ << "Invalid serialized platform handle dispatcher (missing handles)";
+ return nullptr;
+ }
+
+ // We take ownership of the handle, so we have to invalidate the one in
+ // |platform_handles|.
+ std::swap(platform_handle, (*platform_handles)[platform_handle_index]);
+ }
+
+
+ // TODO(jam): temporary until we send message_queue_ via shared memory.
+ size -= sizeof(SerializedMessagePipeHandleDispatcher);
+ const char* messages = static_cast<const char*>(source);
+ messages += sizeof(SerializedMessagePipeHandleDispatcher);
+
+
+ char* initial_read_data = nullptr;
+ size_t initial_read_size = 0;
+
+ if (serialization->read_buffer_size) {
+ initial_read_data = const_cast<char*>(messages);
+ initial_read_size = serialization->read_buffer_size;
+
+ messages += initial_read_size;
+ size -= initial_read_size;
+ }
+
+ scoped_refptr<MessagePipeDispatcher> rv(
+ Create(MessagePipeDispatcher::kDefaultCreateOptions));
+ rv->InitWithReadBuffer(
+ embedder::ScopedPlatformHandle(platform_handle),
+ initial_read_data, initial_read_size);
+
+ while (size) {
+ size_t message_size;
+ CHECK(MessageInTransit::GetNextMessageSize(
+ messages, size, &message_size));
+ MessageInTransit::View message_view(message_size, messages);
+ size -= message_size;
+ messages += message_size;
+
+ // copied from RawChannel::OnReadCompleted
+ // TODO(jam): don't copy
+ embedder::ScopedPlatformHandleVectorPtr platform_handles;
+ if (message_view.transport_data_buffer()) {
+ size_t num_platform_handles;
+ const void* platform_handle_table;
+ TransportData::GetPlatformHandleTable(
+ message_view.transport_data_buffer(), &num_platform_handles,
+ &platform_handle_table);
+
+ if (num_platform_handles > 0) {
+ platform_handles =
+ GetReadPlatformHandles(num_platform_handles,
+ platform_handle_table).Pass();
+ if (!platform_handles) {
+ LOG(ERROR) << "Invalid number of platform handles received";
+ return nullptr;
+ }
+ }
+ }
+
+
+ // copied below from OnReadMessage
+ // TODO(jam): don't copy
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
+ if (message_view.transport_data_buffer_size() > 0) {
+ DCHECK(message_view.transport_data_buffer());
+ message->SetDispatchers(TransportData::DeserializeDispatchers(
+ message_view.transport_data_buffer(),
+ message_view.transport_data_buffer_size(), platform_handles.Pass()));
+ }
+
+ rv->message_queue_.AddMessage(message.Pass());
+ }
+
+ return rv;
+}
+
+MessagePipeDispatcher::MessagePipeDispatcher()
+ : channel_(nullptr),
+ serialized_(false),
+ calling_init_(false),
+ error_(false) {
+}
+
+MessagePipeDispatcher::~MessagePipeDispatcher() {
+ // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
+ DCHECK(!channel_);
+}
+
+void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
+ lock().AssertAcquired();
+ awakable_list_.CancelAll();
+}
+
+void MessagePipeDispatcher::CloseImplNoLock() {
+ lock().AssertAcquired();
+ if (g_use_channel_on_io) {
+ mojo::embedder::internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
+ } else {
+ CloseOnIO();
+ }
+}
+
+void MessagePipeDispatcher::SerializeInternal() {
+ // need to stop watching handle immediately, even tho not on IO thread, so
+ // that other messages aren't read after this.
+ {
+ if (channel_) {
+ serialized_platform_handle_ =
+ channel_->ReleaseHandle(&serialized_read_buffer_).release();
+
+ channel_ = nullptr;
yzshen1 2015/09/23 22:47:08 This seems unintended because some places expect t
+ } else {
+ // It's valid that the other side wrote some data and closed its end.
+ }
+ }
+
+ DCHECK(serialized_message_queue_.empty());
+ // see comment in method below, this is only temporary till we implement a
+ // solution with shared buffer
+ while (!message_queue_.IsEmpty()) {
+ scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
+ size_t cur_size = serialized_message_queue_.size();
+
+
+ // When MojoWriteMessage is called, the MessageInTransit doesn't have
+ // dispatchers set and CreateEquivaent... is called since the dispatchers
+ // can be referenced by others. here dispatchers aren't referenced by
+ // others, but rawchannel can still call to them. so since we dont call
+ // createequiv, manually call TransportStarted and TransportEnd.
+ DispatcherVector dispatchers;
+ if (message->has_dispatchers())
+ dispatchers = *message->dispatchers();
+ for (size_t i = 0; i < dispatchers.size(); ++i)
+ dispatchers[i]->TransportStarted();
+
+ //TODO(jam): this handling for dispatchers only works on windows where we
+ //send transportdata as bytes instead of as parameters to sendmsg.
+ message->SerializeAndCloseDispatchers();
+ // cont'd below
+
+
+ size_t main_buffer_size = message->main_buffer_size();
+ size_t transport_data_buffer_size = message->transport_data() ?
+ message->transport_data()->buffer_size() : 0;
+ size_t total_size = message->total_size();
+
+
+
+ serialized_message_queue_.resize(cur_size + total_size);
+ memcpy(&serialized_message_queue_[cur_size], message->main_buffer(),
+ main_buffer_size);
+
+ // cont'd
+ if (transport_data_buffer_size != 0) {
+#if defined(OS_WIN)
+ // TODO(jam): copied from RawChannelWin::WriteNoLock(
+ if (channel_->GetSerializedPlatformHandleSize()) {
yzshen1 2015/09/23 22:47:08 channel_ has been set to null on line 289?
+ char* serialization_data =
+ static_cast<char*>(message->transport_data()->buffer()) +
+ message->transport_data()->platform_handle_table_offset();
+ embedder::PlatformHandleVector* all_platform_handles =
+ message->transport_data()->platform_handles();
+ if (all_platform_handles) {
+ DWORD current_process_id = base::GetCurrentProcId();
+ for (size_t i = 0; i < all_platform_handles->size(); i++) {
+ *reinterpret_cast<DWORD*>(serialization_data) = current_process_id;
+ serialization_data += sizeof(DWORD);
+ *reinterpret_cast<HANDLE*>(serialization_data) =
+ all_platform_handles->at(i).handle;
+ serialization_data += sizeof(HANDLE);
+ all_platform_handles->at(i) = embedder::PlatformHandle();
+ }
+ }
+ }
+
+ memcpy(&serialized_message_queue_[
+ cur_size + total_size - transport_data_buffer_size],
+ message->transport_data()->buffer(), transport_data_buffer_size);
+#else
+ NOTREACHED() << "TODO(jam) implement";
+#endif
+ }
+
+ for (size_t i = 0; i < dispatchers.size(); ++i)
+ dispatchers[i]->TransportEnded();
+ }
+
+ serialized_ = true;
+}
+
+scoped_refptr<Dispatcher>
+MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
+ lock().AssertAcquired();
+
+ SerializeInternal();
+
+ // TODO(vtl): Currently, there are no options, so we just use
+ // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
+ // too.
+ scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
+ rv->channel_ = channel_;
yzshen1 2015/09/23 22:47:08 |channel_| has been set to nullptr in SerializeInt
+ channel_ = nullptr;
+
+
+ rv->serialized_platform_handle_ = serialized_platform_handle_;
+ serialized_platform_handle_ = mojo::embedder::PlatformHandle();
+ serialized_message_queue_.swap(rv->serialized_message_queue_);
+ serialized_read_buffer_.swap(rv->serialized_read_buffer_);
+ rv->serialized_ = true;
+ return scoped_refptr<Dispatcher>(rv.get());
+}
+
+MojoResult AttachTransportsNoLock(
+ MessageInTransit* message,
+ std::vector<DispatcherTransport>* transports) {
+ DCHECK(!message->has_dispatchers());
+ scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
+ dispatchers->reserve(transports->size());
+ for (size_t i = 0; i < transports->size(); i++) {
+ if ((*transports)[i].is_valid()) {
+ dispatchers->push_back(
+ (*transports)[i].CreateEquivalentDispatcherAndClose());
+ } else {
+ LOG(WARNING) << "Enqueueing null dispatcher";
+ dispatchers->push_back(nullptr);
+ }
+ }
+ message->SetDispatchers(dispatchers.Pass());
+ return MOJO_RESULT_OK;
+}
+
+
+MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
+ UserPointer<const void> bytes,
+ uint32_t num_bytes,
+ std::vector<DispatcherTransport>* transports,
+ MojoWriteMessageFlags flags) {
+
+ DCHECK(!transports ||
+ (transports->size() > 0 &&
+ transports->size() <= GetConfiguration().max_message_num_handles));
+
+ lock().AssertAcquired();
+
+ if (!channel_) {
+ DCHECK(error_);
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
+
+ if (num_bytes > GetConfiguration().max_message_num_bytes)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, num_bytes, bytes));
+ if (transports) {
+ MojoResult result = AttachTransportsNoLock(message.get(), transports);
+ if (result != MOJO_RESULT_OK)
+ return result;
+ }
+
+ // TODO(jam): pass in GetSerializedPlatformHandleSize instead of RawChannel
+ message->SerializeAndCloseDispatchers();
+ channel_->WriteMessage(message.Pass());
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
+ UserPointer<void> bytes,
+ UserPointer<uint32_t> num_bytes,
+ DispatcherVector* dispatchers,
+ uint32_t* num_dispatchers,
+ MojoReadMessageFlags flags) {
+ lock().AssertAcquired();
+// return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers,
+// num_dispatchers, flags);
+
+ DCHECK(!dispatchers || dispatchers->empty());
+
+ const uint32_t max_bytes = num_bytes.IsNull() ? 0 : num_bytes.Get();
+ const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
+
+ if (message_queue_.IsEmpty()) {
+ return error_ ? MOJO_RESULT_FAILED_PRECONDITION
+ : MOJO_RESULT_SHOULD_WAIT;
+ }
+
+ // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
+ // and release the lock immediately.
+ bool enough_space = true;
+ MessageInTransit* message = message_queue_.PeekMessage();
+ if (!num_bytes.IsNull())
+ num_bytes.Put(message->num_bytes());
+ if (message->num_bytes() <= max_bytes)
+ bytes.PutArray(message->bytes(), message->num_bytes());
+ else
+ enough_space = false;
+
+ if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
+ if (num_dispatchers)
+ *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
+ if (enough_space) {
+ if (queued_dispatchers->empty()) {
+ // Nothing to do.
+ } else if (queued_dispatchers->size() <= max_num_dispatchers) {
+ DCHECK(dispatchers);
+ dispatchers->swap(*queued_dispatchers);
+ } else {
+ enough_space = false;
+ }
+ }
+ } else {
+ if (num_dispatchers)
+ *num_dispatchers = 0;
+ }
+
+ message = nullptr;
+
+ if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
+ message_queue_.DiscardMessage();
+
+ // Now it's empty, thus no longer readable.
+ if (message_queue_.IsEmpty()) {
+ // It's currently not possible to wait for non-readability, but we should
+ // do the state change anyway.
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ }
+ }
+
+ if (!enough_space)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+
+ return MOJO_RESULT_OK;
+}
+
+HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
+ const {
+ lock().AssertAcquired();
+// return message_pipe_->GetHandleSignalsState(port_);
+
+ HandleSignalsState rv;
+ if (!message_queue_.IsEmpty()) {
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
+ }
+ if (!error_) {
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+ rv.satisfiable_signals |=
+ MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
+ } else {
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ }
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ return rv;
+}
+
+MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
+ Awakable* awakable,
+ MojoHandleSignals signals,
+ uint32_t context,
+ HandleSignalsState* signals_state) {
+ lock().AssertAcquired();
+// return message_pipe_->AddAwakable(port_, awakable, signals, context,
+// signals_state);
+ HandleSignalsState state = GetHandleSignalsStateImplNoLock();
+ if (state.satisfies(signals)) {
+ if (signals_state)
+ *signals_state = state;
+ return MOJO_RESULT_ALREADY_EXISTS;
+ }
+ if (!state.can_satisfy(signals)) {
+ if (signals_state)
+ *signals_state = state;
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
+
+ awakable_list_.Add(awakable, signals, context);
+ return MOJO_RESULT_OK;
+}
+
+void MessagePipeDispatcher::RemoveAwakableImplNoLock(
+ Awakable* awakable,
+ HandleSignalsState* signals_state) {
+ lock().AssertAcquired();
+
+ awakable_list_.Remove(awakable);
+ if (signals_state)
+ *signals_state = GetHandleSignalsStateImplNoLock();
+}
+
+void MessagePipeDispatcher::StartSerializeImplNoLock(
+ size_t* max_size,
+ size_t* max_platform_handles) {
+ // see comment in dispatcher::startserialize
+ // DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
+
+ if (!serialized_) {
+ // handles the case where we have messages read off rawchannel but not
+ // ready by MojoReadMessage.
+ SerializeInternal();
+ }
+
+ *max_platform_handles = serialized_platform_handle_.is_valid() ? 1 : 0;
+
+ DCHECK_EQ(serialized_message_queue_.size() %
+ MessageInTransit::kMessageAlignment, 0U);
+ *max_size = sizeof(SerializedMessagePipeHandleDispatcher) +
+ serialized_message_queue_.size() +
+ serialized_read_buffer_.size();
+
+ DCHECK_LE(*max_size, TransportData::kMaxSerializedDispatcherSize);
+}
+
+bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
+ void* destination,
+ size_t* actual_size,
+ embedder::PlatformHandleVector* platform_handles) {
+ //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
+
+ CloseImplNoLock();
+ SerializedMessagePipeHandleDispatcher* serialization =
+ static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
+ if (serialized_platform_handle_.is_valid()) {
+ serialization->platform_handle_index = platform_handles->size();
+ platform_handles->push_back(serialized_platform_handle_);
+ } else {
+ serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
+ }
+ serialization->read_buffer_size = serialized_read_buffer_.size();
+
+ char* destination_char = static_cast<char*>(destination);
+ destination_char += sizeof(SerializedMessagePipeHandleDispatcher);
+
+ if (!serialized_read_buffer_.empty()) {
+ memcpy(destination_char, &serialized_read_buffer_[0],
+ serialized_read_buffer_.size());
+ destination_char += serialized_read_buffer_.size();
+ }
+
+
+ if (!serialized_message_queue_.empty()) {
+ memcpy(destination_char,
+ &serialized_message_queue_[0],
+ serialized_message_queue_.size());
+ }
+
+ *actual_size =
+ sizeof(SerializedMessagePipeHandleDispatcher) +
+ serialized_message_queue_.size() +
+ serialized_read_buffer_.size();
+
+ return true;
+}
+
+void MessagePipeDispatcher::TransportStarted() {
+ started_transport_.Acquire();
+}
+
+void MessagePipeDispatcher::TransportEnded() {
+ started_transport_.Release();
+
+ base::AutoLock locker(lock());
+
+ // If transporting of MPD failed, we might have got more data and didn't
+ // awake for.
+ // TODO(jam): should we care about only alerting if it was empty before
+ // TransportStarted?
+ if (!message_queue_.IsEmpty())
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+}
+
+void MessagePipeDispatcher::OnReadMessage(
+ const MessageInTransit::View& message_view,
+ embedder::ScopedPlatformHandleVectorPtr platform_handles) {
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
+ if (message_view.transport_data_buffer_size() > 0) {
+ DCHECK(message_view.transport_data_buffer());
+ message->SetDispatchers(TransportData::DeserializeDispatchers(
+ message_view.transport_data_buffer(),
+ message_view.transport_data_buffer_size(), platform_handles.Pass()));
+ }
+
+ if (started_transport_.Try()) {
+ // we're not in the middle of being sent
+
+ // Can get synchronously called back in Init if there was initial data.
+ scoped_ptr<base::AutoLock> locker;
+ if (!calling_init_) {
+ locker.reset(new base::AutoLock(lock()));
+ }
+
+ bool was_empty = message_queue_.IsEmpty();
+ message_queue_.AddMessage(message.Pass());
+ if (was_empty)
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+
+ started_transport_.Release();
+ } else {
+
+ // if RawChannel is calling OnRead, that means it has its read_lock_
+ // acquired. that means StartSerialize can't be accessing message queue as
+ // it waits on releasehandle first which acquires readlock_!
+ message_queue_.AddMessage(message.Pass());
+ }
+}
+
+void MessagePipeDispatcher::OnError(Error error) {
+ switch (error) {
+ case ERROR_READ_SHUTDOWN:
+ // The other side was cleanly closed, so this isn't actually an error.
+ DVLOG(1) << "MessagePipeDispatcher read error (shutdown)";
+ break;
+ case ERROR_READ_BROKEN:
+ LOG(ERROR) << "MessagePipeDispatcher read error (connection broken)";
+ break;
+ case ERROR_READ_BAD_MESSAGE:
+ // Receiving a bad message means either a bug, data corruption, or
+ // malicious attack (probably due to some other bug).
+ LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)";
+ break;
+ case ERROR_READ_UNKNOWN:
+ LOG(ERROR) << "MessagePipeDispatcher read error (unknown)";
+ break;
+ case ERROR_WRITE:
+ // Write errors are slightly notable: they probably shouldn't happen under
+ // normal operation (but maybe the other side crashed).
+ LOG(WARNING) << "MessagePipeDispatcher write error";
+ break;
+ }
+
+ error_ = true;
+ if (started_transport_.Try()) {
+ base::AutoLock locker(lock());
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
+ channel_ = nullptr;
+ started_transport_.Release();
+ } else {
+ // We must be waiting to call ReleaseHandle. It will call Shutdown.
+ }
+}
+
+} // namespace system
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698