Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ebd07a3cdebe22ef538181429fa54d70f4f631ad |
--- /dev/null |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -0,0 +1,720 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/edk/system/message_pipe_dispatcher.h" |
+ |
+#include "base/bind.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "mojo/edk/embedder/embedder_internal.h" |
+#include "mojo/edk/system/configuration.h" |
+#include "mojo/edk/system/message_in_transit.h" |
+#include "mojo/edk/system/options_validation.h" |
+#include "mojo/edk/system/transport_data.h" |
+ |
+namespace mojo { |
+namespace edk { |
+ |
+// TODO(jam): do more tests on using channel on same thread if it supports it ( |
+// i.e. with USE_CHROME_EDK and Windows). Also see ipc_channel_mojo.cc |
+bool g_use_channel_on_io_thread_only = true; |
+ |
+const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
+ |
+struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
+ size_t platform_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
+ size_t read_buffer_size; // any bytes after this are serialized messages |
+}; |
+ |
+// MessagePipeDispatcher ------------------------------------------------------- |
+ |
+const MojoCreateMessagePipeOptions |
+ MessagePipeDispatcher::kDefaultCreateOptions = { |
+ static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
+ |
+MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
+ const MojoCreateMessagePipeOptions* in_options, |
+ MojoCreateMessagePipeOptions* out_options) { |
+ const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; |
+ |
+ *out_options = kDefaultCreateOptions; |
+ if (!in_options) |
+ return MOJO_RESULT_OK; |
+ |
+ UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
+ if (!reader.is_valid()) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
+ return MOJO_RESULT_OK; |
+ if ((reader.options().flags & ~kKnownFlags)) |
+ return MOJO_RESULT_UNIMPLEMENTED; |
+ out_options->flags = reader.options().flags; |
+ |
+ // Checks for fields beyond |flags|: |
+ |
+ // (Nothing here yet.) |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void MessagePipeDispatcher::Init(ScopedPlatformHandle message_pipe) { |
+ InitWithReadBuffer(message_pipe.Pass(), nullptr, 0); |
+} |
+ |
+void MessagePipeDispatcher::InitWithReadBuffer( |
+ ScopedPlatformHandle message_pipe, |
+ char* data, |
+ size_t size) { |
+ if (message_pipe.get().is_valid()) { |
+ channel_ = RawChannel::Create(message_pipe.Pass()); |
+ |
+ // TODO(jam): It's probably cleaner to pass this in Init call. |
+ if (size) |
+ channel_->SetInitialReadBufferData(data, size); |
+ if (g_use_channel_on_io_thread_only) { |
+ internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
+ } else { |
+ InitOnIO(); |
+ } |
+ // TODO(jam): optimize for when running on IO thread? |
+ } |
+} |
+ |
+void MessagePipeDispatcher::InitOnIO() { |
+ base::AutoLock locker(lock()); |
+ calling_init_ = true; |
+ if (channel_) |
+ channel_->Init(this); |
+ calling_init_ = false; |
+} |
+ |
+void MessagePipeDispatcher::CloseOnIO() { |
+ base::AutoLock locker(lock()); |
+ |
+ if (channel_) { |
+ channel_->Shutdown(); |
+ channel_ = nullptr; |
+ } |
+} |
+ |
+Dispatcher::Type MessagePipeDispatcher::GetType() const { |
+ return Type::MESSAGE_PIPE; |
+} |
+ |
+// TODO(jam): this is copied from RawChannelWin till I figure out what's the |
+// best way we want to share this. Need to also consider posix which does |
+// require access to the RawChannel. |
+// Since this is used for serialization of messages read/written to a MP that |
+// aren't consumed by Mojo primitives yet, there could be an unbounded number of |
+// them when a MP is being sent. As a result, even for POSIX we will probably |
+// want to send the handles to the shell process and exchange them for tokens |
+// (since we can be sure that the shell will respond to our IPCs, compared to |
+// the other end where we're sending the MP to, which may not be reading...). |
+ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
+ size_t num_platform_handles, |
+ const void* platform_handle_table) { |
+ // TODO(jam): this code will have to be updated once it's used in a sandbox |
+ // and the receiving process doesn't have duplicate permission for the |
+ // receiver. Once there's a broker and we have a connection to it (possibly |
+ // through ConnectionManager), then we can make a sync IPC to it here to get a |
+ // token for this handle, and it will duplicate the handle to is process. Then |
+ // we pass the token to the receiver, which will then make a sync call to the |
+ // broker to get a duplicated handle. This will also allow us to avoid leaks |
+ // of the handle if the receiver dies, since the broker can notice that. |
+ DCHECK_GT(num_platform_handles, 0u); |
+ ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector()); |
+ |
+#if defined(OS_WIN) |
+ const char* serialization_data = |
+ static_cast<const char*>(platform_handle_table); |
+ for (size_t i = 0; i < num_platform_handles; i++) { |
+ DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data); |
+ serialization_data += sizeof(DWORD); |
+ HANDLE source_handle = *reinterpret_cast<const HANDLE*>(serialization_data); |
+ serialization_data += sizeof(HANDLE); |
+ base::Process sender = |
+ base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE); |
+ DCHECK(sender.IsValid()); |
+ HANDLE target_handle = NULL; |
+ BOOL dup_result = |
+ DuplicateHandle(sender.Handle(), source_handle, |
+ base::GetCurrentProcessHandle(), &target_handle, 0, |
+ FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); |
+ DCHECK(dup_result); |
+ rv->push_back(PlatformHandle(target_handle)); |
+ } |
+#else |
+ NOTREACHED() << "TODO(jam): implement"; |
+#endif |
+ return rv.Pass(); |
+} |
+ |
+scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
+ const void* source, |
+ size_t size, |
+ PlatformHandleVector* platform_handles) { |
+ const SerializedMessagePipeHandleDispatcher* serialization = |
+ static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
+ size_t platform_handle_index = serialization->platform_handle_index; |
+ |
+ |
+ // Starts off invalid, which is what we want. |
+ PlatformHandle platform_handle; |
+ |
+ if (platform_handle_index != kInvalidMessagePipeHandleIndex) { |
+ if (!platform_handles || |
+ platform_handle_index >= platform_handles->size()) { |
+ LOG(ERROR) |
+ << "Invalid serialized platform handle dispatcher (missing handles)"; |
+ return nullptr; |
+ } |
+ |
+ // We take ownership of the handle, so we have to invalidate the one in |
+ // |platform_handles|. |
+ std::swap(platform_handle, (*platform_handles)[platform_handle_index]); |
+ } |
+ |
+ // TODO(jam): temporary until we send message_queue_ via shared memory. |
+ size -= sizeof(SerializedMessagePipeHandleDispatcher); |
+ const char* messages = static_cast<const char*>(source); |
+ messages += sizeof(SerializedMessagePipeHandleDispatcher); |
+ |
+ char* initial_read_data = nullptr; |
+ size_t initial_read_size = 0; |
+ |
+ if (serialization->read_buffer_size) { |
+ initial_read_data = const_cast<char*>(messages); |
+ initial_read_size = serialization->read_buffer_size; |
+ |
+ messages += initial_read_size; |
+ size -= initial_read_size; |
+ } |
+ |
+ scoped_refptr<MessagePipeDispatcher> rv( |
+ Create(MessagePipeDispatcher::kDefaultCreateOptions)); |
+ rv->InitWithReadBuffer( |
+ ScopedPlatformHandle(platform_handle), |
+ initial_read_data, initial_read_size); |
+ |
+ while (size) { |
+ size_t message_size; |
+ CHECK(MessageInTransit::GetNextMessageSize( |
+ messages, size, &message_size)); |
+ MessageInTransit::View message_view(message_size, messages); |
+ size -= message_size; |
+ messages += message_size; |
+ |
+ // TODO(jam): Copied below from RawChannelWin. See commment above |
+ // GetReadPlatformHandles. |
+ ScopedPlatformHandleVectorPtr platform_handles; |
brucedawson
2015/10/05 16:50:26
This shadows the platform_handles function paramet
|
+ if (message_view.transport_data_buffer()) { |
+ size_t num_platform_handles; |
+ const void* platform_handle_table; |
+ TransportData::GetPlatformHandleTable( |
+ message_view.transport_data_buffer(), &num_platform_handles, |
+ &platform_handle_table); |
+ |
+ if (num_platform_handles > 0) { |
+ platform_handles = |
+ GetReadPlatformHandles(num_platform_handles, |
+ platform_handle_table).Pass(); |
+ if (!platform_handles) { |
+ LOG(ERROR) << "Invalid number of platform handles received"; |
+ return nullptr; |
+ } |
+ } |
+ } |
+ |
+ // TODO(jam): Copied below from RawChannelWin. See commment above |
+ // GetReadPlatformHandles. |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
+ if (message_view.transport_data_buffer_size() > 0) { |
+ DCHECK(message_view.transport_data_buffer()); |
+ message->SetDispatchers(TransportData::DeserializeDispatchers( |
+ message_view.transport_data_buffer(), |
+ message_view.transport_data_buffer_size(), platform_handles.Pass())); |
+ } |
+ |
+ rv->message_queue_.AddMessage(message.Pass()); |
+ } |
+ |
+ return rv; |
+} |
+ |
+MessagePipeDispatcher::MessagePipeDispatcher() |
+ : channel_(nullptr), |
+ serialized_(false), |
+ calling_init_(false), |
+ error_(false) { |
+} |
+ |
+MessagePipeDispatcher::~MessagePipeDispatcher() { |
+ // |Close()|/|CloseImplNoLock()| should have taken care of the channel. |
+ DCHECK(!channel_); |
+} |
+ |
+void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
+ lock().AssertAcquired(); |
+ awakable_list_.CancelAll(); |
+} |
+ |
+void MessagePipeDispatcher::CloseImplNoLock() { |
+ lock().AssertAcquired(); |
+ if (g_use_channel_on_io_thread_only) { |
+ internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
+ } else { |
+ CloseOnIO(); |
+ } |
+} |
+ |
+void MessagePipeDispatcher::SerializeInternal() { |
+ // We need to stop watching handle immediately, even tho not on IO thread, so |
+ // that other messages aren't read after this. |
+ { |
+ if (channel_) { |
+ serialized_platform_handle_ = |
+ channel_->ReleaseHandle(&serialized_read_buffer_).release(); |
+ channel_ = nullptr; |
+ } else { |
+ // It's valid that the other side wrote some data and closed its end. |
+ } |
+ } |
+ |
+ DCHECK(serialized_message_queue_.empty()); |
+ // see comment in method below, this is only temporary till we implement a |
+ // solution with shared buffer |
+ while (!message_queue_.IsEmpty()) { |
+ scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
+ size_t cur_size = serialized_message_queue_.size(); |
+ |
+ |
+ // When MojoWriteMessage is called, the MessageInTransit doesn't have |
+ // dispatchers set and CreateEquivaent... is called since the dispatchers |
+ // can be referenced by others. here dispatchers aren't referenced by |
+ // others, but rawchannel can still call to them. so since we dont call |
+ // createequiv, manually call TransportStarted and TransportEnd. |
+ DispatcherVector dispatchers; |
+ if (message->has_dispatchers()) |
+ dispatchers = *message->dispatchers(); |
+ for (size_t i = 0; i < dispatchers.size(); ++i) |
+ dispatchers[i]->TransportStarted(); |
+ |
+ // TODO(jam): this handling for dispatchers only works on windows where we |
+ // send transportdata as bytes instead of as parameters to sendmsg. |
+ message->SerializeAndCloseDispatchers(); |
+ // cont'd below |
+ |
+ |
+ size_t main_buffer_size = message->main_buffer_size(); |
+ size_t transport_data_buffer_size = message->transport_data() ? |
+ message->transport_data()->buffer_size() : 0; |
+ size_t total_size = message->total_size(); |
+ |
+ serialized_message_queue_.resize(cur_size + total_size); |
+ memcpy(&serialized_message_queue_[cur_size], message->main_buffer(), |
+ main_buffer_size); |
+ |
+ // cont'd |
+ if (transport_data_buffer_size != 0) { |
+#if defined(OS_WIN) |
+ // TODO(jam): copied from RawChannelWin::WriteNoLock( |
+ if (RawChannel::GetSerializedPlatformHandleSize()) { |
+ char* serialization_data = |
+ static_cast<char*>(message->transport_data()->buffer()) + |
+ message->transport_data()->platform_handle_table_offset(); |
+ PlatformHandleVector* all_platform_handles = |
+ message->transport_data()->platform_handles(); |
+ if (all_platform_handles) { |
+ DWORD current_process_id = base::GetCurrentProcId(); |
+ for (size_t i = 0; i < all_platform_handles->size(); i++) { |
+ *reinterpret_cast<DWORD*>(serialization_data) = current_process_id; |
+ serialization_data += sizeof(DWORD); |
+ *reinterpret_cast<HANDLE*>(serialization_data) = |
+ all_platform_handles->at(i).handle; |
+ serialization_data += sizeof(HANDLE); |
+ all_platform_handles->at(i) = PlatformHandle(); |
+ } |
+ } |
+ } |
+ |
+ memcpy(&serialized_message_queue_[ |
+ cur_size + total_size - transport_data_buffer_size], |
+ message->transport_data()->buffer(), transport_data_buffer_size); |
+#else |
+ NOTREACHED() << "TODO(jam) implement"; |
+#endif |
+ } |
+ |
+ for (size_t i = 0; i < dispatchers.size(); ++i) |
+ dispatchers[i]->TransportEnded(); |
+ } |
+ |
+ serialized_ = true; |
+} |
+ |
+scoped_refptr<Dispatcher> |
+MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
+ lock().AssertAcquired(); |
+ |
+ SerializeInternal(); |
+ |
+ // TODO(vtl): Currently, there are no options, so we just use |
+ // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options |
+ // too. |
+ scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); |
+ rv->serialized_platform_handle_ = serialized_platform_handle_; |
+ serialized_platform_handle_ = PlatformHandle(); |
+ serialized_message_queue_.swap(rv->serialized_message_queue_); |
+ serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
+ rv->serialized_ = true; |
+ return scoped_refptr<Dispatcher>(rv.get()); |
+} |
+ |
+MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
+ const void* bytes, |
+ uint32_t num_bytes, |
+ std::vector<DispatcherTransport>* transports, |
+ MojoWriteMessageFlags flags) { |
+ |
+ DCHECK(!transports || |
+ (transports->size() > 0 && |
+ transports->size() <= GetConfiguration().max_message_num_handles)); |
+ |
+ lock().AssertAcquired(); |
+ |
+ if (!channel_) { |
+ DCHECK(error_); |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
+ } |
+ |
+ if (num_bytes > GetConfiguration().max_message_num_bytes) |
+ return MOJO_RESULT_RESOURCE_EXHAUSTED; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
+ if (transports) { |
+ MojoResult result = AttachTransportsNoLock(message.get(), transports); |
+ if (result != MOJO_RESULT_OK) |
+ return result; |
+ } |
+ |
+ message->SerializeAndCloseDispatchers(); |
+ channel_->WriteMessage(message.Pass()); |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
+ void* bytes, |
+ uint32_t* num_bytes, |
+ DispatcherVector* dispatchers, |
+ uint32_t* num_dispatchers, |
+ MojoReadMessageFlags flags) { |
+ lock().AssertAcquired(); |
+ DCHECK(!dispatchers || dispatchers->empty()); |
+ |
+ const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
+ const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
+ |
+ if (message_queue_.IsEmpty()) { |
+ return error_ ? MOJO_RESULT_FAILED_PRECONDITION |
+ : MOJO_RESULT_SHOULD_WAIT; |
+ } |
+ |
+ // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
+ // and release the lock immediately. |
+ bool enough_space = true; |
+ MessageInTransit* message = message_queue_.PeekMessage(); |
+ if (num_bytes) |
+ *num_bytes = message->num_bytes(); |
+ if (message->num_bytes() <= max_bytes) |
+ memcpy(bytes, message->bytes(), message->num_bytes()); |
+ else |
+ enough_space = false; |
+ |
+ if (DispatcherVector* queued_dispatchers = message->dispatchers()) { |
+ if (num_dispatchers) |
+ *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); |
+ if (enough_space) { |
+ if (queued_dispatchers->empty()) { |
+ // Nothing to do. |
+ } else if (queued_dispatchers->size() <= max_num_dispatchers) { |
+ DCHECK(dispatchers); |
+ dispatchers->swap(*queued_dispatchers); |
+ } else { |
+ enough_space = false; |
+ } |
+ } |
+ } else { |
+ if (num_dispatchers) |
+ *num_dispatchers = 0; |
+ } |
+ |
+ message = nullptr; |
+ |
+ if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
+ message_queue_.DiscardMessage(); |
+ |
+ // Now it's empty, thus no longer readable. |
+ if (message_queue_.IsEmpty()) { |
+ // It's currently not possible to wait for non-readability, but we should |
+ // do the state change anyway. |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ } |
+ } |
+ |
+ if (!enough_space) |
+ return MOJO_RESULT_RESOURCE_EXHAUSTED; |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
+ const { |
+ lock().AssertAcquired(); |
+ |
+ HandleSignalsState rv; |
+ if (!message_queue_.IsEmpty()) { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ } |
+ if (!error_) { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
+ rv.satisfiable_signals |= |
+ MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE; |
+ } else { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ } |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ return rv; |
+} |
+ |
+MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
+ Awakable* awakable, |
+ MojoHandleSignals signals, |
+ uint32_t context, |
+ HandleSignalsState* signals_state) { |
+ lock().AssertAcquired(); |
+ HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
+ if (state.satisfies(signals)) { |
+ if (signals_state) |
+ *signals_state = state; |
+ return MOJO_RESULT_ALREADY_EXISTS; |
+ } |
+ if (!state.can_satisfy(signals)) { |
+ if (signals_state) |
+ *signals_state = state; |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
+ } |
+ |
+ awakable_list_.Add(awakable, signals, context); |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void MessagePipeDispatcher::RemoveAwakableImplNoLock( |
+ Awakable* awakable, |
+ HandleSignalsState* signals_state) { |
+ lock().AssertAcquired(); |
+ |
+ awakable_list_.Remove(awakable); |
+ if (signals_state) |
+ *signals_state = GetHandleSignalsStateImplNoLock(); |
+} |
+ |
+void MessagePipeDispatcher::StartSerializeImplNoLock( |
+ size_t* max_size, |
+ size_t* max_platform_handles) { |
+ if (!serialized_) |
+ SerializeInternal(); |
+ |
+ *max_platform_handles = serialized_platform_handle_.is_valid() ? 1 : 0; |
+ |
+ DCHECK_EQ(serialized_message_queue_.size() % |
+ MessageInTransit::kMessageAlignment, 0U); |
+ *max_size = sizeof(SerializedMessagePipeHandleDispatcher) + |
+ serialized_message_queue_.size() + |
+ serialized_read_buffer_.size(); |
+ |
+ DCHECK_LE(*max_size, TransportData::kMaxSerializedDispatcherSize); |
+} |
+ |
+bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
+ void* destination, |
+ size_t* actual_size, |
+ PlatformHandleVector* platform_handles) { |
+ CloseImplNoLock(); |
+ SerializedMessagePipeHandleDispatcher* serialization = |
+ static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
+ if (serialized_platform_handle_.is_valid()) { |
+ serialization->platform_handle_index = platform_handles->size(); |
+ platform_handles->push_back(serialized_platform_handle_); |
+ } else { |
+ serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
+ } |
+ serialization->read_buffer_size = serialized_read_buffer_.size(); |
+ |
+ char* destination_char = static_cast<char*>(destination); |
+ destination_char += sizeof(SerializedMessagePipeHandleDispatcher); |
+ |
+ if (!serialized_read_buffer_.empty()) { |
+ memcpy(destination_char, &serialized_read_buffer_[0], |
+ serialized_read_buffer_.size()); |
+ destination_char += serialized_read_buffer_.size(); |
+ } |
+ |
+ |
+ if (!serialized_message_queue_.empty()) { |
+ memcpy(destination_char, |
+ &serialized_message_queue_[0], |
+ serialized_message_queue_.size()); |
+ } |
+ |
+ *actual_size = |
+ sizeof(SerializedMessagePipeHandleDispatcher) + |
+ serialized_message_queue_.size() + |
+ serialized_read_buffer_.size(); |
+ |
+ return true; |
+} |
+ |
+void MessagePipeDispatcher::TransportStarted() { |
+ started_transport_.Acquire(); |
+} |
+ |
+void MessagePipeDispatcher::TransportEnded() { |
+ started_transport_.Release(); |
+ |
+ base::AutoLock locker(lock()); |
+ |
+ // If transporting of MPD failed, we might have got more data and didn't |
+ // awake for. |
+ // TODO(jam): should we care about only alerting if it was empty before |
+ // TransportStarted? |
+ if (!message_queue_.IsEmpty()) |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+} |
+ |
+void MessagePipeDispatcher::OnReadMessage( |
+ const MessageInTransit::View& message_view, |
+ ScopedPlatformHandleVectorPtr platform_handles) { |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
+ if (message_view.transport_data_buffer_size() > 0) { |
+ DCHECK(message_view.transport_data_buffer()); |
+ message->SetDispatchers(TransportData::DeserializeDispatchers( |
+ message_view.transport_data_buffer(), |
+ message_view.transport_data_buffer_size(), platform_handles.Pass())); |
+ } |
+ |
+ if (started_transport_.Try()) { |
+ // we're not in the middle of being sent |
+ |
+ // Can get synchronously called back in Init if there was initial data. |
+ scoped_ptr<base::AutoLock> locker; |
+ if (!calling_init_) { |
+ locker.reset(new base::AutoLock(lock())); |
+ } |
+ |
+ bool was_empty = message_queue_.IsEmpty(); |
+ message_queue_.AddMessage(message.Pass()); |
+ if (was_empty) |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ |
+ started_transport_.Release(); |
+ } else { |
+ |
+ // if RawChannel is calling OnRead, that means it has its read_lock_ |
+ // acquired. that means StartSerialize can't be accessing message queue as |
+ // it waits on releasehandle first which acquires readlock_! |
+ message_queue_.AddMessage(message.Pass()); |
+ } |
+} |
+ |
+void MessagePipeDispatcher::OnError(Error error) { |
+ switch (error) { |
+ case ERROR_READ_SHUTDOWN: |
+ // The other side was cleanly closed, so this isn't actually an error. |
+ DVLOG(1) << "MessagePipeDispatcher read error (shutdown)"; |
+ break; |
+ case ERROR_READ_BROKEN: |
+ LOG(ERROR) << "MessagePipeDispatcher read error (connection broken)"; |
+ break; |
+ case ERROR_READ_BAD_MESSAGE: |
+ // Receiving a bad message means either a bug, data corruption, or |
+ // malicious attack (probably due to some other bug). |
+ LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)"; |
+ break; |
+ case ERROR_READ_UNKNOWN: |
+ LOG(ERROR) << "MessagePipeDispatcher read error (unknown)"; |
+ break; |
+ case ERROR_WRITE: |
+ // Write errors are slightly notable: they probably shouldn't happen under |
+ // normal operation (but maybe the other side crashed). |
+ LOG(WARNING) << "MessagePipeDispatcher write error"; |
+ break; |
+ } |
+ |
+ error_ = true; |
+ if (started_transport_.Try()) { |
+ base::AutoLock locker(lock()); |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); |
+ channel_ = nullptr; |
+ started_transport_.Release(); |
+ } else { |
+ // We must be waiting to call ReleaseHandle. It will call Shutdown. |
+ } |
+} |
+ |
+MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
+ MessageInTransit* message, |
+ std::vector<DispatcherTransport>* transports) { |
+ DCHECK(!message->has_dispatchers()); |
+ |
+ // You're not allowed to send either handle to a message pipe over the message |
+ // pipe, so check for this. (The case of trying to write a handle to itself is |
+ // taken care of by |Core|. That case kind of makes sense, but leads to |
+ // complications if, e.g., both sides try to do the same thing with their |
+ // respective handles simultaneously. The other case, of trying to write the |
+ // peer handle to a handle, doesn't make sense -- since no handle will be |
+ // available to read the message from.) |
+ for (size_t i = 0; i < transports->size(); i++) { |
+ if (!(*transports)[i].is_valid()) |
+ continue; |
+ if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { |
+ MessagePipeDispatcher* mp = |
+ static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); |
+ if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { |
+ // The other case should have been disallowed by |Core|. (Note: |port| |
+ // is the peer port of the handle given to |WriteMessage()|.) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ } |
+ } |
+ } |
+ |
+ // Clone the dispatchers and attach them to the message. (This must be done as |
+ // a separate loop, since we want to leave the dispatchers alone on failure.) |
+ scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
+ dispatchers->reserve(transports->size()); |
+ for (size_t i = 0; i < transports->size(); i++) { |
+ if ((*transports)[i].is_valid()) { |
+ dispatchers->push_back( |
+ (*transports)[i].CreateEquivalentDispatcherAndClose()); |
+ } else { |
+ LOG(WARNING) << "Enqueueing null dispatcher"; |
+ dispatchers->push_back(nullptr); |
+ } |
+ } |
+ message->SetDispatchers(dispatchers.Pass()); |
+ return MOJO_RESULT_OK; |
+} |
+ |
+} // namespace edk |
+} // namespace mojo |