Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2bdeca2f4b184316fdc098c2a35d15b89c1cf51b |
--- /dev/null |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -0,0 +1,724 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/edk/system/message_pipe_dispatcher.h" |
+ |
+#include "base/bind.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "mojo/edk/embedder/embedder_internal.h" |
+#include "mojo/edk/system/configuration.h" |
+#include "mojo/edk/system/memory.h" |
+#include "mojo/edk/system/message_in_transit.h" |
+#include "mojo/edk/system/options_validation.h" |
+#include "mojo/edk/system/transport_data.h" |
+ |
+// TODO(jam): do more tests on using channel on same thread if it supports it ( |
+// i.e. with USE_CHROME_EDK and Windows). Also see ipc_channel_mojo.cc |
+bool g_use_channel_on_io = true; |
+ |
+namespace mojo { |
+namespace system { |
+ |
+const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
+ |
+struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
+ size_t platform_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
+ size_t read_buffer_size; // any bytes after this are serialized messages |
+}; |
+ |
+// MessagePipeDispatcher ------------------------------------------------------- |
+ |
+const MojoCreateMessagePipeOptions |
+ MessagePipeDispatcher::kDefaultCreateOptions = { |
+ static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
+ |
+MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
+ UserPointer<const MojoCreateMessagePipeOptions> in_options, |
+ MojoCreateMessagePipeOptions* out_options) { |
+ const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; |
+ |
+ *out_options = kDefaultCreateOptions; |
+ if (in_options.IsNull()) |
+ return MOJO_RESULT_OK; |
+ |
+ UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
+ if (!reader.is_valid()) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
+ return MOJO_RESULT_OK; |
+ if ((reader.options().flags & ~kKnownFlags)) |
+ return MOJO_RESULT_UNIMPLEMENTED; |
+ out_options->flags = reader.options().flags; |
+ |
+ // Checks for fields beyond |flags|: |
+ |
+ // (Nothing here yet.) |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void MessagePipeDispatcher::Init(embedder::ScopedPlatformHandle message_pipe) { |
+ InitWithReadBuffer(message_pipe.Pass(), nullptr, 0); |
+} |
+ |
+void MessagePipeDispatcher::InitWithReadBuffer( |
+ embedder::ScopedPlatformHandle message_pipe, |
+ char* data, |
+ size_t size) { |
+ if (message_pipe.get().is_valid()) { |
+ channel_ = RawChannel::Create(message_pipe.Pass()); |
+ |
+ |
+ |
+ |
+ // TODO(jam): pass this in Init call.... |
+ if (size) |
+ channel_->SetInitialReadBufferData(data, size); |
+ if (g_use_channel_on_io) { |
+ mojo::embedder::internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
+ } else { |
+ InitOnIO(); |
+ } |
+ // TODO(jam): optimize for when running on IO thread |
+ } |
+} |
+ |
+void MessagePipeDispatcher::InitOnIO() { |
+ base::AutoLock locker(lock()); |
+ calling_init_ = true; |
+ if (channel_) |
+ channel_->Init(this); |
+ calling_init_ = false; |
+} |
+ |
+void MessagePipeDispatcher::CloseOnIO() { |
+ base::AutoLock locker(lock()); |
+ |
+ // TODO(jam) CLEANUP! this should be done inside RawChannel..... |
+ if (channel_) { |
+ channel_->Shutdown(); |
+ channel_ = nullptr; |
+ } |
+} |
+ |
+Dispatcher::Type MessagePipeDispatcher::GetType() const { |
+ return Type::MESSAGE_PIPE; |
+} |
+ |
+ |
+ |
+// TODO(jam): this is copied from RawChannelWin till I figure out what's the |
+// best way we want to share this. Need to also consider posix which does |
+// require access to the RawChannel. |
+embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
+ size_t num_platform_handles, |
+ const void* platform_handle_table) { |
+ // TODO(jam): this code will have to be updated once it's used in a sandbox |
+ // and the receiving process doesn't have duplicate permission for the |
+ // receiver. Once there's a broker and we have a connection to it (possibly |
+ // through ConnectionManager), then we can make a sync IPC to it here to get a |
+ // token for this handle, and it will duplicate the handle to is process. Then |
+ // we pass the token to the receiver, which will then make a sync call to the |
+ // broker to get a duplicated handle. This will also allow us to avoid leaks |
+ // of the handle if the receiver dies, since the broker can notice that. |
yzshen1
2015/09/23 22:47:08
(Just wonder whether you've come up with some new
|
+ DCHECK_GT(num_platform_handles, 0u); |
+ embedder::ScopedPlatformHandleVectorPtr rv( |
+ new embedder::PlatformHandleVector()); |
+ |
+#if defined(OS_WIN) |
+ const char* serialization_data = |
+ static_cast<const char*>(platform_handle_table); |
+ for (size_t i = 0; i < num_platform_handles; i++) { |
+ DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data); |
+ serialization_data += sizeof(DWORD); |
+ HANDLE source_handle = *reinterpret_cast<const HANDLE*>(serialization_data); |
+ serialization_data += sizeof(HANDLE); |
+ base::Process sender = |
+ base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE); |
+ DCHECK(sender.IsValid()); |
+ HANDLE target_handle = NULL; |
+ BOOL dup_result = |
+ DuplicateHandle(sender.Handle(), source_handle, |
+ base::GetCurrentProcessHandle(), &target_handle, 0, |
+ FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); |
+ DCHECK(dup_result); |
+ rv->push_back(embedder::PlatformHandle(target_handle)); |
+ } |
+#else |
+ NOTREACHED() << "TODO(jam): implement"; |
+#endif |
+ return rv.Pass(); |
+} |
+ |
+scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
+ const void* source, |
+ size_t size, |
+ embedder::PlatformHandleVector* platform_handles) { |
+ const SerializedMessagePipeHandleDispatcher* serialization = |
+ static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
+ size_t platform_handle_index = serialization->platform_handle_index; |
+ |
+ |
+ // Starts off invalid, which is what we want. |
+ embedder::PlatformHandle platform_handle; |
+ |
+ if (platform_handle_index != kInvalidMessagePipeHandleIndex) { |
+ if (!platform_handles || |
+ platform_handle_index >= platform_handles->size()) { |
+ LOG(ERROR) |
+ << "Invalid serialized platform handle dispatcher (missing handles)"; |
+ return nullptr; |
+ } |
+ |
+ // We take ownership of the handle, so we have to invalidate the one in |
+ // |platform_handles|. |
+ std::swap(platform_handle, (*platform_handles)[platform_handle_index]); |
+ } |
+ |
+ |
+ // TODO(jam): temporary until we send message_queue_ via shared memory. |
+ size -= sizeof(SerializedMessagePipeHandleDispatcher); |
+ const char* messages = static_cast<const char*>(source); |
+ messages += sizeof(SerializedMessagePipeHandleDispatcher); |
+ |
+ |
+ char* initial_read_data = nullptr; |
+ size_t initial_read_size = 0; |
+ |
+ if (serialization->read_buffer_size) { |
+ initial_read_data = const_cast<char*>(messages); |
+ initial_read_size = serialization->read_buffer_size; |
+ |
+ messages += initial_read_size; |
+ size -= initial_read_size; |
+ } |
+ |
+ scoped_refptr<MessagePipeDispatcher> rv( |
+ Create(MessagePipeDispatcher::kDefaultCreateOptions)); |
+ rv->InitWithReadBuffer( |
+ embedder::ScopedPlatformHandle(platform_handle), |
+ initial_read_data, initial_read_size); |
+ |
+ while (size) { |
+ size_t message_size; |
+ CHECK(MessageInTransit::GetNextMessageSize( |
+ messages, size, &message_size)); |
+ MessageInTransit::View message_view(message_size, messages); |
+ size -= message_size; |
+ messages += message_size; |
+ |
+ // copied from RawChannel::OnReadCompleted |
+ // TODO(jam): don't copy |
+ embedder::ScopedPlatformHandleVectorPtr platform_handles; |
+ if (message_view.transport_data_buffer()) { |
+ size_t num_platform_handles; |
+ const void* platform_handle_table; |
+ TransportData::GetPlatformHandleTable( |
+ message_view.transport_data_buffer(), &num_platform_handles, |
+ &platform_handle_table); |
+ |
+ if (num_platform_handles > 0) { |
+ platform_handles = |
+ GetReadPlatformHandles(num_platform_handles, |
+ platform_handle_table).Pass(); |
+ if (!platform_handles) { |
+ LOG(ERROR) << "Invalid number of platform handles received"; |
+ return nullptr; |
+ } |
+ } |
+ } |
+ |
+ |
+ // copied below from OnReadMessage |
+ // TODO(jam): don't copy |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
+ if (message_view.transport_data_buffer_size() > 0) { |
+ DCHECK(message_view.transport_data_buffer()); |
+ message->SetDispatchers(TransportData::DeserializeDispatchers( |
+ message_view.transport_data_buffer(), |
+ message_view.transport_data_buffer_size(), platform_handles.Pass())); |
+ } |
+ |
+ rv->message_queue_.AddMessage(message.Pass()); |
+ } |
+ |
+ return rv; |
+} |
+ |
+MessagePipeDispatcher::MessagePipeDispatcher() |
+ : channel_(nullptr), |
+ serialized_(false), |
+ calling_init_(false), |
+ error_(false) { |
+} |
+ |
+MessagePipeDispatcher::~MessagePipeDispatcher() { |
+ // |Close()|/|CloseImplNoLock()| should have taken care of the channel. |
+ DCHECK(!channel_); |
+} |
+ |
+void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
+ lock().AssertAcquired(); |
+ awakable_list_.CancelAll(); |
+} |
+ |
+void MessagePipeDispatcher::CloseImplNoLock() { |
+ lock().AssertAcquired(); |
+ if (g_use_channel_on_io) { |
+ mojo::embedder::internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
+ } else { |
+ CloseOnIO(); |
+ } |
+} |
+ |
+void MessagePipeDispatcher::SerializeInternal() { |
+ // need to stop watching handle immediately, even tho not on IO thread, so |
+ // that other messages aren't read after this. |
+ { |
+ if (channel_) { |
+ serialized_platform_handle_ = |
+ channel_->ReleaseHandle(&serialized_read_buffer_).release(); |
+ |
+ channel_ = nullptr; |
yzshen1
2015/09/23 22:47:08
This seems unintended because some places expect t
|
+ } else { |
+ // It's valid that the other side wrote some data and closed its end. |
+ } |
+ } |
+ |
+ DCHECK(serialized_message_queue_.empty()); |
+ // see comment in method below, this is only temporary till we implement a |
+ // solution with shared buffer |
+ while (!message_queue_.IsEmpty()) { |
+ scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
+ size_t cur_size = serialized_message_queue_.size(); |
+ |
+ |
+ // When MojoWriteMessage is called, the MessageInTransit doesn't have |
+ // dispatchers set and CreateEquivaent... is called since the dispatchers |
+ // can be referenced by others. here dispatchers aren't referenced by |
+ // others, but rawchannel can still call to them. so since we dont call |
+ // createequiv, manually call TransportStarted and TransportEnd. |
+ DispatcherVector dispatchers; |
+ if (message->has_dispatchers()) |
+ dispatchers = *message->dispatchers(); |
+ for (size_t i = 0; i < dispatchers.size(); ++i) |
+ dispatchers[i]->TransportStarted(); |
+ |
+ //TODO(jam): this handling for dispatchers only works on windows where we |
+ //send transportdata as bytes instead of as parameters to sendmsg. |
+ message->SerializeAndCloseDispatchers(); |
+ // cont'd below |
+ |
+ |
+ size_t main_buffer_size = message->main_buffer_size(); |
+ size_t transport_data_buffer_size = message->transport_data() ? |
+ message->transport_data()->buffer_size() : 0; |
+ size_t total_size = message->total_size(); |
+ |
+ |
+ |
+ serialized_message_queue_.resize(cur_size + total_size); |
+ memcpy(&serialized_message_queue_[cur_size], message->main_buffer(), |
+ main_buffer_size); |
+ |
+ // cont'd |
+ if (transport_data_buffer_size != 0) { |
+#if defined(OS_WIN) |
+ // TODO(jam): copied from RawChannelWin::WriteNoLock( |
+ if (channel_->GetSerializedPlatformHandleSize()) { |
yzshen1
2015/09/23 22:47:08
channel_ has been set to null on line 289?
|
+ char* serialization_data = |
+ static_cast<char*>(message->transport_data()->buffer()) + |
+ message->transport_data()->platform_handle_table_offset(); |
+ embedder::PlatformHandleVector* all_platform_handles = |
+ message->transport_data()->platform_handles(); |
+ if (all_platform_handles) { |
+ DWORD current_process_id = base::GetCurrentProcId(); |
+ for (size_t i = 0; i < all_platform_handles->size(); i++) { |
+ *reinterpret_cast<DWORD*>(serialization_data) = current_process_id; |
+ serialization_data += sizeof(DWORD); |
+ *reinterpret_cast<HANDLE*>(serialization_data) = |
+ all_platform_handles->at(i).handle; |
+ serialization_data += sizeof(HANDLE); |
+ all_platform_handles->at(i) = embedder::PlatformHandle(); |
+ } |
+ } |
+ } |
+ |
+ memcpy(&serialized_message_queue_[ |
+ cur_size + total_size - transport_data_buffer_size], |
+ message->transport_data()->buffer(), transport_data_buffer_size); |
+#else |
+ NOTREACHED() << "TODO(jam) implement"; |
+#endif |
+ } |
+ |
+ for (size_t i = 0; i < dispatchers.size(); ++i) |
+ dispatchers[i]->TransportEnded(); |
+ } |
+ |
+ serialized_ = true; |
+} |
+ |
+scoped_refptr<Dispatcher> |
+MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
+ lock().AssertAcquired(); |
+ |
+ SerializeInternal(); |
+ |
+ // TODO(vtl): Currently, there are no options, so we just use |
+ // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options |
+ // too. |
+ scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); |
+ rv->channel_ = channel_; |
yzshen1
2015/09/23 22:47:08
|channel_| has been set to nullptr in SerializeInt
|
+ channel_ = nullptr; |
+ |
+ |
+ rv->serialized_platform_handle_ = serialized_platform_handle_; |
+ serialized_platform_handle_ = mojo::embedder::PlatformHandle(); |
+ serialized_message_queue_.swap(rv->serialized_message_queue_); |
+ serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
+ rv->serialized_ = true; |
+ return scoped_refptr<Dispatcher>(rv.get()); |
+} |
+ |
+MojoResult AttachTransportsNoLock( |
+ MessageInTransit* message, |
+ std::vector<DispatcherTransport>* transports) { |
+ DCHECK(!message->has_dispatchers()); |
+ scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
+ dispatchers->reserve(transports->size()); |
+ for (size_t i = 0; i < transports->size(); i++) { |
+ if ((*transports)[i].is_valid()) { |
+ dispatchers->push_back( |
+ (*transports)[i].CreateEquivalentDispatcherAndClose()); |
+ } else { |
+ LOG(WARNING) << "Enqueueing null dispatcher"; |
+ dispatchers->push_back(nullptr); |
+ } |
+ } |
+ message->SetDispatchers(dispatchers.Pass()); |
+ return MOJO_RESULT_OK; |
+} |
+ |
+ |
+MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
+ UserPointer<const void> bytes, |
+ uint32_t num_bytes, |
+ std::vector<DispatcherTransport>* transports, |
+ MojoWriteMessageFlags flags) { |
+ |
+ DCHECK(!transports || |
+ (transports->size() > 0 && |
+ transports->size() <= GetConfiguration().max_message_num_handles)); |
+ |
+ lock().AssertAcquired(); |
+ |
+ if (!channel_) { |
+ DCHECK(error_); |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
+ } |
+ |
+ if (num_bytes > GetConfiguration().max_message_num_bytes) |
+ return MOJO_RESULT_RESOURCE_EXHAUSTED; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
+ if (transports) { |
+ MojoResult result = AttachTransportsNoLock(message.get(), transports); |
+ if (result != MOJO_RESULT_OK) |
+ return result; |
+ } |
+ |
+ // TODO(jam): pass in GetSerializedPlatformHandleSize instead of RawChannel |
+ message->SerializeAndCloseDispatchers(); |
+ channel_->WriteMessage(message.Pass()); |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
+ UserPointer<void> bytes, |
+ UserPointer<uint32_t> num_bytes, |
+ DispatcherVector* dispatchers, |
+ uint32_t* num_dispatchers, |
+ MojoReadMessageFlags flags) { |
+ lock().AssertAcquired(); |
+// return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers, |
+// num_dispatchers, flags); |
+ |
+ DCHECK(!dispatchers || dispatchers->empty()); |
+ |
+ const uint32_t max_bytes = num_bytes.IsNull() ? 0 : num_bytes.Get(); |
+ const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
+ |
+ if (message_queue_.IsEmpty()) { |
+ return error_ ? MOJO_RESULT_FAILED_PRECONDITION |
+ : MOJO_RESULT_SHOULD_WAIT; |
+ } |
+ |
+ // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
+ // and release the lock immediately. |
+ bool enough_space = true; |
+ MessageInTransit* message = message_queue_.PeekMessage(); |
+ if (!num_bytes.IsNull()) |
+ num_bytes.Put(message->num_bytes()); |
+ if (message->num_bytes() <= max_bytes) |
+ bytes.PutArray(message->bytes(), message->num_bytes()); |
+ else |
+ enough_space = false; |
+ |
+ if (DispatcherVector* queued_dispatchers = message->dispatchers()) { |
+ if (num_dispatchers) |
+ *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); |
+ if (enough_space) { |
+ if (queued_dispatchers->empty()) { |
+ // Nothing to do. |
+ } else if (queued_dispatchers->size() <= max_num_dispatchers) { |
+ DCHECK(dispatchers); |
+ dispatchers->swap(*queued_dispatchers); |
+ } else { |
+ enough_space = false; |
+ } |
+ } |
+ } else { |
+ if (num_dispatchers) |
+ *num_dispatchers = 0; |
+ } |
+ |
+ message = nullptr; |
+ |
+ if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
+ message_queue_.DiscardMessage(); |
+ |
+ // Now it's empty, thus no longer readable. |
+ if (message_queue_.IsEmpty()) { |
+ // It's currently not possible to wait for non-readability, but we should |
+ // do the state change anyway. |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ } |
+ } |
+ |
+ if (!enough_space) |
+ return MOJO_RESULT_RESOURCE_EXHAUSTED; |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
+ const { |
+ lock().AssertAcquired(); |
+// return message_pipe_->GetHandleSignalsState(port_); |
+ |
+ HandleSignalsState rv; |
+ if (!message_queue_.IsEmpty()) { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ } |
+ if (!error_) { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
+ rv.satisfiable_signals |= |
+ MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE; |
+ } else { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ } |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ return rv; |
+} |
+ |
+MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
+ Awakable* awakable, |
+ MojoHandleSignals signals, |
+ uint32_t context, |
+ HandleSignalsState* signals_state) { |
+ lock().AssertAcquired(); |
+// return message_pipe_->AddAwakable(port_, awakable, signals, context, |
+// signals_state); |
+ HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
+ if (state.satisfies(signals)) { |
+ if (signals_state) |
+ *signals_state = state; |
+ return MOJO_RESULT_ALREADY_EXISTS; |
+ } |
+ if (!state.can_satisfy(signals)) { |
+ if (signals_state) |
+ *signals_state = state; |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
+ } |
+ |
+ awakable_list_.Add(awakable, signals, context); |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void MessagePipeDispatcher::RemoveAwakableImplNoLock( |
+ Awakable* awakable, |
+ HandleSignalsState* signals_state) { |
+ lock().AssertAcquired(); |
+ |
+ awakable_list_.Remove(awakable); |
+ if (signals_state) |
+ *signals_state = GetHandleSignalsStateImplNoLock(); |
+} |
+ |
+void MessagePipeDispatcher::StartSerializeImplNoLock( |
+ size_t* max_size, |
+ size_t* max_platform_handles) { |
+ // see comment in dispatcher::startserialize |
+ // DCHECK(HasOneRef()); // Only one ref => no need to take the lock. |
+ |
+ if (!serialized_) { |
+ // handles the case where we have messages read off rawchannel but not |
+ // ready by MojoReadMessage. |
+ SerializeInternal(); |
+ } |
+ |
+ *max_platform_handles = serialized_platform_handle_.is_valid() ? 1 : 0; |
+ |
+ DCHECK_EQ(serialized_message_queue_.size() % |
+ MessageInTransit::kMessageAlignment, 0U); |
+ *max_size = sizeof(SerializedMessagePipeHandleDispatcher) + |
+ serialized_message_queue_.size() + |
+ serialized_read_buffer_.size(); |
+ |
+ DCHECK_LE(*max_size, TransportData::kMaxSerializedDispatcherSize); |
+} |
+ |
+bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
+ void* destination, |
+ size_t* actual_size, |
+ embedder::PlatformHandleVector* platform_handles) { |
+ //DCHECK(HasOneRef()); // Only one ref => no need to take the lock. |
+ |
+ CloseImplNoLock(); |
+ SerializedMessagePipeHandleDispatcher* serialization = |
+ static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
+ if (serialized_platform_handle_.is_valid()) { |
+ serialization->platform_handle_index = platform_handles->size(); |
+ platform_handles->push_back(serialized_platform_handle_); |
+ } else { |
+ serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
+ } |
+ serialization->read_buffer_size = serialized_read_buffer_.size(); |
+ |
+ char* destination_char = static_cast<char*>(destination); |
+ destination_char += sizeof(SerializedMessagePipeHandleDispatcher); |
+ |
+ if (!serialized_read_buffer_.empty()) { |
+ memcpy(destination_char, &serialized_read_buffer_[0], |
+ serialized_read_buffer_.size()); |
+ destination_char += serialized_read_buffer_.size(); |
+ } |
+ |
+ |
+ if (!serialized_message_queue_.empty()) { |
+ memcpy(destination_char, |
+ &serialized_message_queue_[0], |
+ serialized_message_queue_.size()); |
+ } |
+ |
+ *actual_size = |
+ sizeof(SerializedMessagePipeHandleDispatcher) + |
+ serialized_message_queue_.size() + |
+ serialized_read_buffer_.size(); |
+ |
+ return true; |
+} |
+ |
+void MessagePipeDispatcher::TransportStarted() { |
+ started_transport_.Acquire(); |
+} |
+ |
+void MessagePipeDispatcher::TransportEnded() { |
+ started_transport_.Release(); |
+ |
+ base::AutoLock locker(lock()); |
+ |
+ // If transporting of MPD failed, we might have got more data and didn't |
+ // awake for. |
+ // TODO(jam): should we care about only alerting if it was empty before |
+ // TransportStarted? |
+ if (!message_queue_.IsEmpty()) |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+} |
+ |
+void MessagePipeDispatcher::OnReadMessage( |
+ const MessageInTransit::View& message_view, |
+ embedder::ScopedPlatformHandleVectorPtr platform_handles) { |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
+ if (message_view.transport_data_buffer_size() > 0) { |
+ DCHECK(message_view.transport_data_buffer()); |
+ message->SetDispatchers(TransportData::DeserializeDispatchers( |
+ message_view.transport_data_buffer(), |
+ message_view.transport_data_buffer_size(), platform_handles.Pass())); |
+ } |
+ |
+ if (started_transport_.Try()) { |
+ // we're not in the middle of being sent |
+ |
+ // Can get synchronously called back in Init if there was initial data. |
+ scoped_ptr<base::AutoLock> locker; |
+ if (!calling_init_) { |
+ locker.reset(new base::AutoLock(lock())); |
+ } |
+ |
+ bool was_empty = message_queue_.IsEmpty(); |
+ message_queue_.AddMessage(message.Pass()); |
+ if (was_empty) |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ |
+ started_transport_.Release(); |
+ } else { |
+ |
+ // if RawChannel is calling OnRead, that means it has its read_lock_ |
+ // acquired. that means StartSerialize can't be accessing message queue as |
+ // it waits on releasehandle first which acquires readlock_! |
+ message_queue_.AddMessage(message.Pass()); |
+ } |
+} |
+ |
+void MessagePipeDispatcher::OnError(Error error) { |
+ switch (error) { |
+ case ERROR_READ_SHUTDOWN: |
+ // The other side was cleanly closed, so this isn't actually an error. |
+ DVLOG(1) << "MessagePipeDispatcher read error (shutdown)"; |
+ break; |
+ case ERROR_READ_BROKEN: |
+ LOG(ERROR) << "MessagePipeDispatcher read error (connection broken)"; |
+ break; |
+ case ERROR_READ_BAD_MESSAGE: |
+ // Receiving a bad message means either a bug, data corruption, or |
+ // malicious attack (probably due to some other bug). |
+ LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)"; |
+ break; |
+ case ERROR_READ_UNKNOWN: |
+ LOG(ERROR) << "MessagePipeDispatcher read error (unknown)"; |
+ break; |
+ case ERROR_WRITE: |
+ // Write errors are slightly notable: they probably shouldn't happen under |
+ // normal operation (but maybe the other side crashed). |
+ LOG(WARNING) << "MessagePipeDispatcher write error"; |
+ break; |
+ } |
+ |
+ error_ = true; |
+ if (started_transport_.Try()) { |
+ base::AutoLock locker(lock()); |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); |
+ channel_ = nullptr; |
+ started_transport_.Release(); |
+ } else { |
+ // We must be waiting to call ReleaseHandle. It will call Shutdown. |
+ } |
+} |
+ |
+} // namespace system |
+} // namespace mojo |