Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
index 687710cf94e78aed7d2fafa2a2971b8e92c749ee..d26c912c6e3e59e3975c2a87103f3bcbbecec118 100644 |
--- a/mojo/edk/system/message_pipe_dispatcher.cc |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -4,1095 +4,611 @@ |
#include "mojo/edk/system/message_pipe_dispatcher.h" |
-#include <stddef.h> |
-#include <stdint.h> |
- |
#include <limits> |
-#include <utility> |
-#include <vector> |
-#include "base/bind.h" |
-#include "base/debug/stack_trace.h" |
-#include "base/logging.h" |
-#include "base/message_loop/message_loop.h" |
+#include "base/macros.h" |
+#include "base/memory/ref_counted.h" |
+#include "base/memory/scoped_ptr.h" |
#include "mojo/edk/embedder/embedder_internal.h" |
-#include "mojo/edk/embedder/platform_handle_utils.h" |
-#include "mojo/edk/embedder/platform_shared_buffer.h" |
-#include "mojo/edk/embedder/platform_support.h" |
-#include "mojo/edk/system/broker.h" |
-#include "mojo/edk/system/configuration.h" |
-#include "mojo/edk/system/message_in_transit.h" |
-#include "mojo/edk/system/options_validation.h" |
-#include "mojo/edk/system/transport_data.h" |
+#include "mojo/edk/system/core.h" |
+#include "mojo/edk/system/node_controller.h" |
+#include "mojo/edk/system/ports_message.h" |
+#include "mojo/public/c/system/macros.h" |
namespace mojo { |
namespace edk { |
namespace { |
-const uint32_t kInvalidMessagePipeHandleIndex = static_cast<uint32_t>(-1); |
- |
-struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
- MOJO_ALIGNAS(1) bool transferable; |
- MOJO_ALIGNAS(1) bool write_error; |
- MOJO_ALIGNAS(8) uint64_t pipe_id; // If transferable is false. |
- // The following members are only set if transferable is true. |
- // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
- // was closed. |
- MOJO_ALIGNAS(4) uint32_t platform_handle_index; |
- |
- MOJO_ALIGNAS(4) uint32_t |
- shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
- MOJO_ALIGNAS(4) uint32_t shared_memory_size; |
- |
- MOJO_ALIGNAS(4) uint32_t serialized_read_buffer_size; |
- MOJO_ALIGNAS(4) uint32_t serialized_write_buffer_size; |
- MOJO_ALIGNAS(4) uint32_t serialized_message_queue_size; |
- |
- // These are the FDs required as part of serializing channel_ and |
- // message_queue_. This is only used on POSIX. |
- MOJO_ALIGNAS(4) |
- uint32_t serialized_fds_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
- MOJO_ALIGNAS(4) uint32_t serialized_read_fds_length; |
- MOJO_ALIGNAS(4) uint32_t serialized_write_fds_length; |
- MOJO_ALIGNAS(4) uint32_t serialized_message_fds_length; |
+// Header attached to every message sent over a message pipe. |
+struct MOJO_ALIGNAS(8) MessageHeader { |
+ // The number of serialized dispatchers included in this header. |
+ uint32_t num_dispatchers; |
+ |
+ // Total size of the header, including serialized dispatcher data. |
+ uint32_t header_size; |
}; |
-char* SerializeBuffer(char* start, std::vector<char>* buffer) { |
- if (buffer->size()) |
- memcpy(start, &(*buffer)[0], buffer->size()); |
- return start + buffer->size(); |
-} |
+// Header for each dispatcher, immediately following the message header. |
+struct MOJO_ALIGNAS(8) DispatcherHeader { |
+ // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. |
+ int32_t type; |
-bool GetHandle(size_t index, |
- PlatformHandleVector* platform_handles, |
- ScopedPlatformHandle* handle) { |
- if (index == kInvalidMessagePipeHandleIndex) |
- return true; |
+ // The size of the serialized dispatcher, not including this header. |
+ uint32_t num_bytes; |
- if (!platform_handles || index >= platform_handles->size()) { |
- LOG(ERROR) |
- << "Invalid serialized message pipe dispatcher (missing handles)"; |
- return false; |
- } |
+ // The number of ports needed to deserialize this dispatcher. |
+ uint32_t num_ports; |
- // We take ownership of the handle, so we have to invalidate the one in |
- // |platform_handles|. |
- handle->reset((*platform_handles)[index]); |
- (*platform_handles)[index] = PlatformHandle(); |
- return true; |
-} |
+ // The number of platform handles needed to deserialize this dispatcher. |
+ uint32_t num_platform_handles; |
+}; |
-#if defined(OS_POSIX) |
-void ClosePlatformHandles(std::vector<int>* fds) { |
- for (size_t i = 0; i < fds->size(); ++i) |
- PlatformHandle((*fds)[i]).CloseIfNecessary(); |
-} |
-#endif |
+struct MOJO_ALIGNAS(8) SerializedState { |
+ uint64_t pipe_id; |
+ int8_t endpoint; |
+}; |
} // namespace |
-// MessagePipeDispatcher ------------------------------------------------------- |
+// A PortObserver which forwards to a MessagePipeDispatcher. This owns a |
+// reference to the MPD to ensure it lives as long as the observed port. |
+class MessagePipeDispatcher::PortObserverThunk |
+ : public NodeController::PortObserver { |
+ public: |
+ explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher) |
+ : dispatcher_(dispatcher) {} |
-const MojoCreateMessagePipeOptions |
- MessagePipeDispatcher::kDefaultCreateOptions = { |
- static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
- MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
+ private: |
+ ~PortObserverThunk() override {} |
-MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
- const MojoCreateMessagePipeOptions* in_options, |
- MojoCreateMessagePipeOptions* out_options) { |
- const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
- MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
- MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; |
- |
- *out_options = kDefaultCreateOptions; |
- if (!in_options) |
- return MOJO_RESULT_OK; |
- |
- UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
- if (!reader.is_valid()) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
+ // NodeController::PortObserver: |
+ void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
- if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
- return MOJO_RESULT_OK; |
- if ((reader.options().flags & ~kKnownFlags)) |
- return MOJO_RESULT_UNIMPLEMENTED; |
- out_options->flags = reader.options().flags; |
+ scoped_refptr<MessagePipeDispatcher> dispatcher_; |
- // Checks for fields beyond |flags|: |
- |
- // (Nothing here yet.) |
- |
- return MOJO_RESULT_OK; |
-} |
- |
-void MessagePipeDispatcher::Init( |
- ScopedPlatformHandle message_pipe, |
- char* serialized_read_buffer, size_t serialized_read_buffer_size, |
- char* serialized_write_buffer, size_t serialized_write_buffer_size, |
- std::vector<int>* serialized_read_fds, |
- std::vector<int>* serialized_write_fds) { |
- CHECK(transferable_); |
- if (message_pipe.get().is_valid()) { |
- channel_ = RawChannel::Create(std::move(message_pipe)); |
- |
- // TODO(jam): It's probably cleaner to pass this in Init call. |
- channel_->SetSerializedData( |
- serialized_read_buffer, serialized_read_buffer_size, |
- serialized_write_buffer, serialized_write_buffer_size, |
- serialized_read_fds, serialized_write_fds); |
- internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
- } |
-} |
- |
-void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { |
- CHECK(!transferable_); |
- pipe_id_ = pipe_id; |
-} |
- |
-void MessagePipeDispatcher::InitOnIO() { |
- base::AutoLock locker(lock()); |
- CHECK(transferable_); |
- calling_init_ = true; |
- if (channel_) |
- channel_->Init(this); |
- calling_init_ = false; |
-} |
- |
-void MessagePipeDispatcher::CloseOnIOAndRelease() { |
- { |
- base::AutoLock locker(lock()); |
- CloseOnIO(); |
- } |
- Release(); // To match CloseImplNoLock. |
-} |
+ DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
+}; |
-void MessagePipeDispatcher::CloseOnIO() { |
- lock().AssertAcquired(); |
- |
- if (channel_) { |
- // If we closed the channel now then in-flight message pipes wouldn't get |
- // closed, and their other side wouldn't get a connection error notification |
- // which could lead to hangs or leaks. So we ask the other side of this |
- // message pipe to close, which ensures that we have dispatched all |
- // in-flight message pipes. |
- DCHECK(!close_requested_); |
- close_requested_ = true; |
- AddRef(); |
- scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
- if (!transferable_) |
- message->set_route_id(pipe_id_); |
- channel_->WriteMessage(std::move(message)); |
- return; |
- } |
+MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
+ const ports::PortRef& port, |
+ uint64_t pipe_id, |
+ int endpoint) |
+ : node_controller_(node_controller), |
+ port_(port), |
+ pipe_id_(pipe_id), |
+ endpoint_(endpoint) { |
+ DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
+ << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
- if (!transferable_ && |
- (non_transferable_state_ == CONNECT_CALLED || |
- non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { |
- if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
- RequestNontransferableChannel(); |
- |
- // We can't cancel the pending request yet, since the other side of the |
- // message pipe would want to get pending outgoing messages (if any) or at |
- // least know that this end was closed. So keep this object alive until |
- // then. |
- non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
- AddRef(); |
- } |
+ node_controller_->SetPortObserver( |
+ port_, |
+ make_scoped_refptr(new PortObserverThunk(this))); |
} |
Dispatcher::Type MessagePipeDispatcher::GetType() const { |
return Type::MESSAGE_PIPE; |
} |
-void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
- DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
- base::AutoLock locker(lock()); |
- channel_ = channel; |
- while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
- channel_->WriteMessage( |
- non_transferable_outgoing_message_queue_.GetMessage()); |
- } |
- |
- if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
- non_transferable_state_ = CONNECTED; |
- // We kept this object alive until it's connected, we can close it now. |
- CloseOnIO(); |
- // Balance the AddRef in CloseOnIO. |
- Release(); |
- return; |
- } |
- |
- non_transferable_state_ = CONNECTED; |
-} |
- |
-#if defined(OS_WIN) |
-// TODO(jam): this is copied from RawChannelWin till I figure out what's the |
-// best way we want to share this. |
-// Since this is used for serialization of messages read/written to a MP that |
-// aren't consumed by Mojo primitives yet, there could be an unbounded number of |
-// them when a MP is being sent. As a result, even for POSIX we will probably |
-// want to send the handles to the shell process and exchange them for tokens |
-// (since we can be sure that the shell will respond to our IPCs, compared to |
-// the other end where we're sending the MP to, which may not be reading...). |
-ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
- size_t num_platform_handles, |
- const void* platform_handle_table) { |
- ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector()); |
- rv->resize(num_platform_handles); |
- |
- const uint64_t* tokens = |
- static_cast<const uint64_t*>(platform_handle_table); |
- internal::g_broker->TokenToHandle(tokens, num_platform_handles, &rv->at(0)); |
- return rv.Pass(); |
+MojoResult MessagePipeDispatcher::Close() { |
+ base::AutoLock lock(signal_lock_); |
+ DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
+ << " [port=" << port_.name() << "]"; |
+ return CloseNoLock(); |
} |
-#endif |
- |
-scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
- const void* source, |
- size_t size, |
- PlatformHandleVector* platform_handles) { |
- if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { |
- LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; |
- return nullptr; |
- } |
- |
- const SerializedMessagePipeHandleDispatcher* serialization = |
- static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
- |
- scoped_refptr<MessagePipeDispatcher> rv( |
- new MessagePipeDispatcher(serialization->transferable)); |
- if (!rv->transferable_) { |
- rv->InitNonTransferable(serialization->pipe_id); |
- return rv; |
- } |
- if (serialization->shared_memory_size != |
- (serialization->serialized_read_buffer_size + |
- serialization->serialized_write_buffer_size + |
- serialization->serialized_message_queue_size)) { |
- LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; |
- return nullptr; |
- } |
+MojoResult MessagePipeDispatcher::WriteMessage( |
+ const void* bytes, |
+ uint32_t num_bytes, |
+ const DispatcherInTransit* dispatchers, |
+ uint32_t num_dispatchers, |
+ MojoWriteMessageFlags flags) { |
- ScopedPlatformHandle platform_handle, shared_memory_handle; |
- if (!GetHandle(serialization->platform_handle_index, |
- platform_handles, &platform_handle) || |
- !GetHandle(serialization->shared_memory_handle_index, |
- platform_handles, &shared_memory_handle)) { |
- return nullptr; |
- } |
+ { |
+ base::AutoLock lock(signal_lock_); |
+ if (port_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ } |
+ |
+ // A structure for retaining information about every Dispatcher we're about |
+ // to send. This information is collected by calling StartSerialize() on |
+ // each dispatcher in sequence. |
+ struct DispatcherInfo { |
+ uint32_t num_bytes; |
+ uint32_t num_ports; |
+ uint32_t num_handles; |
+ }; |
+ |
+ // This is only the base header size. It will grow as we accumulate the |
+ // size of serialized state for each dispatcher. |
+ size_t header_size = sizeof(MessageHeader) + |
+ num_dispatchers * sizeof(DispatcherHeader); |
+ |
+ size_t num_ports = 0; |
+ size_t num_handles = 0; |
+ |
+ std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); |
+ for (size_t i = 0; i < num_dispatchers; ++i) { |
+ Dispatcher* d = dispatchers[i].dispatcher.get(); |
+ d->StartSerialize(&dispatcher_info[i].num_bytes, |
+ &dispatcher_info[i].num_ports, |
+ &dispatcher_info[i].num_handles); |
+ header_size += dispatcher_info[i].num_bytes; |
+ num_ports += dispatcher_info[i].num_ports; |
+ num_handles += dispatcher_info[i].num_handles; |
+ } |
+ |
+ // We now have enough information to fully allocate the message storage. |
+ scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage( |
+ header_size + num_bytes, num_ports, num_handles); |
+ DCHECK(message); |
+ |
+ // Populate the message header with information about serialized dispatchers. |
+ // |
+ // The front of the message is always a MessageHeader followed by a |
+ // DispatcherHeader for each dispatcher to be sent. |
+ MessageHeader* header = |
+ static_cast<MessageHeader*>(message->mutable_payload_bytes()); |
+ DispatcherHeader* dispatcher_headers = |
+ reinterpret_cast<DispatcherHeader*>(header + 1); |
+ |
+ // Serialized dispatcher state immediately follows the series of |
+ // DispatcherHeaders. |
+ char* dispatcher_data = |
+ reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); |
+ |
+ header->num_dispatchers = num_dispatchers; |
+ |
+ // |header_size| is the total number of bytes preceding the message payload, |
+ // including all dispatcher headers and serialized dispatcher state. |
+ DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); |
+ header->header_size = static_cast<uint32_t>(header_size); |
+ |
+ bool cancel_transit = false; |
+ if (num_dispatchers > 0) { |
+ ScopedPlatformHandleVectorPtr handles( |
+ new PlatformHandleVector(num_handles)); |
+ size_t port_index = 0; |
+ size_t handle_index = 0; |
+ for (size_t i = 0; i < num_dispatchers; ++i) { |
+ Dispatcher* d = dispatchers[i].dispatcher.get(); |
+ DispatcherHeader* dh = &dispatcher_headers[i]; |
+ const DispatcherInfo& info = dispatcher_info[i]; |
+ |
+ // Fill in the header for this dispatcher. |
+ dh->type = static_cast<int32_t>(d->GetType()); |
+ dh->num_bytes = info.num_bytes; |
+ dh->num_ports = info.num_ports; |
+ dh->num_platform_handles = info.num_handles; |
+ |
+ // Fill in serialized state, ports, and platform handles. We'll cancel |
+ // the send if the dispatcher implementation rejects for some reason. |
+ if (!d->EndSerialize(static_cast<void*>(dispatcher_data), |
+ message->mutable_ports() + port_index, |
+ handles->data() + handle_index)) { |
+ cancel_transit = true; |
+ break; |
+ } |
- char* serialized_read_buffer = nullptr; |
- size_t serialized_read_buffer_size = 0; |
- char* serialized_write_buffer = nullptr; |
- size_t serialized_write_buffer_size = 0; |
- char* message_queue_data = nullptr; |
- size_t message_queue_size = 0; |
- scoped_refptr<PlatformSharedBuffer> shared_buffer; |
- scoped_ptr<PlatformSharedBufferMapping> mapping; |
- if (shared_memory_handle.is_valid()) { |
- shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
- serialization->shared_memory_size, std::move(shared_memory_handle)); |
- mapping = shared_buffer->Map(0, serialization->shared_memory_size); |
- char* buffer = static_cast<char*>(mapping->GetBase()); |
- if (serialization->serialized_read_buffer_size) { |
- serialized_read_buffer = buffer; |
- serialized_read_buffer_size = serialization->serialized_read_buffer_size; |
- buffer += serialized_read_buffer_size; |
- } |
- if (serialization->serialized_write_buffer_size) { |
- serialized_write_buffer = buffer; |
- serialized_write_buffer_size = |
- serialization->serialized_write_buffer_size; |
- buffer += serialized_write_buffer_size; |
+ dispatcher_data += info.num_bytes; |
+ port_index += info.num_ports; |
+ handle_index += info.num_handles; |
} |
- if (serialization->serialized_message_queue_size) { |
- message_queue_data = buffer; |
- message_queue_size = serialization->serialized_message_queue_size; |
- buffer += message_queue_size; |
- } |
- } |
- rv->write_error_ = serialization->write_error; |
- |
- std::vector<int> serialized_read_fds; |
- std::vector<int> serialized_write_fds; |
-#if defined(OS_POSIX) |
- std::vector<int> serialized_fds; |
- size_t serialized_fds_index = 0; |
- |
- size_t total_fd_count = serialization->serialized_read_fds_length + |
- serialization->serialized_write_fds_length + |
- serialization->serialized_message_fds_length; |
- for (size_t i = 0; i < total_fd_count; ++i) { |
- ScopedPlatformHandle handle; |
- if (!GetHandle(serialization->serialized_fds_index + i, platform_handles, |
- &handle)) { |
- ClosePlatformHandles(&serialized_fds); |
- return nullptr; |
+ if (!cancel_transit) { |
+ // Take ownership of all the handles and move them into message storage. |
+ message->SetHandles(std::move(handles)); |
+ } else { |
+ // Release any platform handles we've accumulated. Their dispatchers |
+ // retain ownership when transit is canceled, so these are not actually |
+ // leaking. |
+ handles->clear(); |
} |
- serialized_fds.push_back(handle.release().handle); |
} |
- serialized_read_fds.assign( |
- serialized_fds.begin(), |
- serialized_fds.begin() + serialization->serialized_read_fds_length); |
- serialized_fds_index += serialization->serialized_read_fds_length; |
- serialized_write_fds.assign( |
- serialized_fds.begin() + serialized_fds_index, |
- serialized_fds.begin() + serialized_fds_index + |
- serialization->serialized_write_fds_length); |
- serialized_fds_index += serialization->serialized_write_fds_length; |
-#endif |
- |
- while (message_queue_size) { |
- size_t message_size; |
- if (!MessageInTransit::GetNextMessageSize( |
- message_queue_data, message_queue_size, &message_size)) { |
- NOTREACHED() << "Couldn't read message size from serialized data."; |
- return nullptr; |
- } |
- if (message_size > message_queue_size) { |
- NOTREACHED() << "Invalid serialized message size."; |
- return nullptr; |
- } |
- MessageInTransit::View message_view(message_size, message_queue_data); |
- message_queue_size -= message_size; |
- message_queue_data += message_size; |
- |
- // TODO(jam): Copied below from RawChannelWin. See commment above |
- // GetReadPlatformHandles. |
- ScopedPlatformHandleVectorPtr temp_platform_handles; |
- if (message_view.transport_data_buffer()) { |
- size_t num_platform_handles; |
- const void* platform_handle_table; |
- TransportData::GetPlatformHandleTable( |
- message_view.transport_data_buffer(), &num_platform_handles, |
- &platform_handle_table); |
- |
- if (num_platform_handles > 0) { |
-#if defined(OS_WIN) |
- temp_platform_handles = |
- GetReadPlatformHandles(num_platform_handles, |
- platform_handle_table).Pass(); |
- if (!temp_platform_handles) { |
- LOG(ERROR) << "Invalid number of platform handles received"; |
- return nullptr; |
- } |
-#else |
- temp_platform_handles.reset(new PlatformHandleVector()); |
- for (size_t i = 0; i < num_platform_handles; ++i) |
- temp_platform_handles->push_back( |
- PlatformHandle(serialized_fds[serialized_fds_index++])); |
-#endif |
+ MojoResult result = MOJO_RESULT_OK; |
+ if (!cancel_transit) { |
+ // Copy the message body. |
+ void* message_body = static_cast<void*>( |
+ static_cast<char*>(message->mutable_payload_bytes()) + header_size); |
+ memcpy(message_body, bytes, num_bytes); |
+ |
+ int rv = node_controller_->SendMessage(port_, &message); |
+ |
+ DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
+ << " [port=" << port_.name() << "; rv=" << rv |
+ << "; num_bytes=" << num_bytes << "]"; |
+ |
+ if (rv != ports::OK) { |
+ if (rv == ports::ERROR_PORT_UNKNOWN || |
+ rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
+ rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
+ result = MOJO_RESULT_INVALID_ARGUMENT; |
+ } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
+ base::AutoLock lock(signal_lock_); |
+ awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ result = MOJO_RESULT_FAILED_PRECONDITION; |
+ } else { |
+ NOTREACHED(); |
+ result = MOJO_RESULT_UNKNOWN; |
} |
- } |
- |
- // TODO(jam): Copied below from RawChannelWin. See commment above |
- // GetReadPlatformHandles. |
- scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
- if (message_view.transport_data_buffer_size() > 0) { |
- DCHECK(message_view.transport_data_buffer()); |
- message->SetDispatchers(TransportData::DeserializeDispatchers( |
- message_view.transport_data_buffer(), |
- message_view.transport_data_buffer_size(), |
- std::move(temp_platform_handles))); |
- } |
- |
- rv->message_queue_.AddMessage(std::move(message)); |
- } |
- |
- rv->Init(std::move(platform_handle), serialized_read_buffer, |
- serialized_read_buffer_size, serialized_write_buffer, |
- serialized_write_buffer_size, &serialized_read_fds, |
- &serialized_write_fds); |
- |
- if (message_queue_size) { // Should be empty by now. |
- LOG(ERROR) << "Invalid queued messages"; |
- return nullptr; |
- } |
- |
- return rv; |
-} |
- |
-MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
- : channel_(nullptr), |
- serialized_read_fds_length_(0u), |
- serialized_write_fds_length_(0u), |
- serialized_message_fds_length_(0u), |
- pipe_id_(0), |
- non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
- serialized_(false), |
- calling_init_(false), |
- write_error_(false), |
- transferable_(transferable), |
- close_requested_(false) { |
-} |
- |
-MessagePipeDispatcher::~MessagePipeDispatcher() { |
- // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
- // exception is if they posted a task to run CloseOnIO but the IO thread shut |
- // down and so when it was deleting pending tasks it caused the last reference |
- // to destruct this object. In that case, safe to destroy the channel. |
- if (channel_ && |
- internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { |
- if (transferable_) { |
- channel_->Shutdown(); |
+ cancel_transit = true; |
} else { |
- internal::g_broker->CloseMessagePipe(pipe_id_, this); |
+ DCHECK(!message); |
} |
- } else { |
- DCHECK(!channel_); |
- } |
-#if defined(OS_POSIX) |
- ClosePlatformHandles(&serialized_fds_); |
-#endif |
-} |
- |
-void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
- lock().AssertAcquired(); |
- awakable_list_.CancelAll(); |
-} |
- |
-void MessagePipeDispatcher::CloseImplNoLock() { |
- lock().AssertAcquired(); |
- // This early circuit fixes leak in unit tests. There's nothing to do in the |
- // posted task. |
- if (!transferable_ && non_transferable_state_ == CLOSED) |
- return; |
- |
- // We take a manual refcount because at shutdown, the task below might not get |
- // a chance to execute. If that happens, the RawChannel will still call our |
- // OnError method because it always runs (since it watches thread |
- // destruction). So to avoid UAF, manually add a reference and only release it |
- // if the task runs. |
- AddRef(); |
- if (!internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, |
- base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this))) { |
- // Avoid a shutdown leak in unittests. If the thread is shutting down, |
- // we can't connect to the other end to let it know that we're closed either |
- // way. |
- if (!transferable_ && non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
- Release(); |
} |
-} |
-void MessagePipeDispatcher::SerializeInternal() { |
- serialized_ = true; |
- if (!transferable_) { |
- CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
- << "Non transferable message pipe being sent after read/write/waited. " |
- << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
- << "the pipe can be sent after it's read or written. This message pipe " |
- << "was previously bound at:\n" |
- << non_transferable_bound_stack_->ToString(); |
- |
- non_transferable_state_ = SERIALISED; |
- return; |
+ if (cancel_transit) { |
+ // We ended up not sending the message. Release all the platform handles. |
+ // Their dipatchers retain ownership when transit is canceled, so these are |
+ // not actually leaking. |
+ DCHECK(message); |
+ Channel::MessagePtr m = message->TakeChannelMessage(); |
+ ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); |
+ if (handles) |
+ handles->clear(); |
} |
- // We need to stop watching handle immediately, even though not on IO thread, |
- // so that other messages aren't read after this. |
- std::vector<int> serialized_read_fds, serialized_write_fds; |
- if (channel_) { |
- bool write_error = false; |
- |
- serialized_platform_handle_ = channel_->ReleaseHandle( |
- &serialized_read_buffer_, &serialized_write_buffer_, |
- &serialized_read_fds, &serialized_write_fds, &write_error); |
- serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), |
- serialized_read_fds.end()); |
- serialized_read_fds_length_ = serialized_read_fds.size(); |
- serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), |
- serialized_write_fds.end()); |
- serialized_write_fds_length_ = serialized_write_fds.size(); |
- channel_ = nullptr; |
- } else { |
- // It's valid that the other side wrote some data and closed its end. |
- } |
+ return result; |
+} |
- DCHECK(serialized_message_queue_.empty()); |
- while (!message_queue_.IsEmpty()) { |
- scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
- |
- // When MojoWriteMessage is called, the MessageInTransit doesn't have |
- // dispatchers set and CreateEquivaent... is called since the dispatchers |
- // can be referenced by others. here dispatchers aren't referenced by |
- // others, but rawchannel can still call to them. so since we dont call |
- // createequiv, manually call TransportStarted and TransportEnd. |
- DispatcherVector dispatchers; |
- if (message->has_dispatchers()) |
- dispatchers = *message->dispatchers(); |
- for (size_t i = 0; i < dispatchers.size(); ++i) |
- dispatchers[i]->TransportStarted(); |
- |
- // TODO(jam): this handling for dispatchers only works on windows where we |
- // send transportdata as bytes instead of as parameters to sendmsg. |
- message->SerializeAndCloseDispatchers(); |
- // cont'd below |
- |
- size_t main_buffer_size = message->main_buffer_size(); |
- size_t transport_data_buffer_size = message->transport_data() ? |
- message->transport_data()->buffer_size() : 0; |
- |
- serialized_message_queue_.insert( |
- serialized_message_queue_.end(), |
- static_cast<const char*>(message->main_buffer()), |
- static_cast<const char*>(message->main_buffer()) + main_buffer_size); |
- |
- // cont'd |
- if (transport_data_buffer_size != 0) { |
- // TODO(jam): copied from RawChannelWin::WriteNoLock( |
- PlatformHandleVector* all_platform_handles = |
- message->transport_data()->platform_handles(); |
- if (all_platform_handles) { |
-#if defined(OS_WIN) |
- uint64_t* tokens = reinterpret_cast<uint64_t*>( |
- static_cast<char*>(message->transport_data()->buffer()) + |
- message->transport_data()->platform_handle_table_offset()); |
- internal::g_broker->HandleToToken( |
- &all_platform_handles->at(0), all_platform_handles->size(), tokens); |
- for (size_t i = 0; i < all_platform_handles->size(); i++) |
- all_platform_handles->at(i) = PlatformHandle(); |
-#else |
- for (size_t i = 0; i < all_platform_handles->size(); i++) { |
- serialized_fds_.push_back(all_platform_handles->at(i).handle); |
- serialized_message_fds_length_++; |
- all_platform_handles->at(i) = PlatformHandle(); |
+MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
+ uint32_t* num_bytes, |
+ MojoHandle* handles, |
+ uint32_t* num_handles, |
+ MojoReadMessageFlags flags) { |
+ { |
+ base::AutoLock lock(signal_lock_); |
+ // We can't read from a port that's closed or in transit! |
+ if (port_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ } |
+ |
+ bool no_space = false; |
+ bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
+ |
+ // Ensure the provided buffers are large enough to hold the next message. |
+ // GetMessageIf provides an atomic way to test the next message without |
+ // committing to removing it from the port's underlying message queue until |
+ // we are sure we can consume it. |
+ |
+ ports::ScopedMessage ports_message; |
+ int rv = node_controller_->node()->GetMessageIf( |
+ port_, |
+ [num_bytes, num_handles, &no_space, &may_discard]( |
+ const ports::Message& next_message) { |
+ const PortsMessage& message = |
+ static_cast<const PortsMessage&>(next_message); |
+ DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader)); |
+ |
+ const MessageHeader* header = |
+ static_cast<const MessageHeader*>(message.payload_bytes()); |
+ DCHECK_LE(header->header_size, message.num_payload_bytes()); |
+ |
+ uint32_t bytes_to_read = 0; |
+ uint32_t bytes_available = |
+ static_cast<uint32_t>(message.num_payload_bytes()) - |
+ header->header_size; |
+ if (num_bytes) { |
+ bytes_to_read = std::min(*num_bytes, bytes_available); |
+ *num_bytes = bytes_available; |
} |
-#endif |
- } |
- |
- serialized_message_queue_.insert( |
- serialized_message_queue_.end(), |
- static_cast<const char*>(message->transport_data()->buffer()), |
- static_cast<const char*>(message->transport_data()->buffer()) + |
- transport_data_buffer_size); |
- } |
- for (size_t i = 0; i < dispatchers.size(); ++i) |
- dispatchers[i]->TransportEnded(); |
- } |
-} |
+ uint32_t handles_to_read = 0; |
+ uint32_t handles_available = header->num_dispatchers; |
+ if (num_handles) { |
+ handles_to_read = std::min(*num_handles, handles_available); |
+ *num_handles = handles_available; |
+ } |
-scoped_refptr<Dispatcher> |
-MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
- lock().AssertAcquired(); |
- |
- SerializeInternal(); |
- |
- scoped_refptr<MessagePipeDispatcher> rv( |
- new MessagePipeDispatcher(transferable_)); |
- rv->serialized_ = true; |
- if (transferable_) { |
- rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); |
- serialized_message_queue_.swap(rv->serialized_message_queue_); |
- serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
- serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
- serialized_fds_.swap(rv->serialized_fds_); |
- rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
- rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
- rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
- rv->write_error_ = write_error_; |
- } else { |
- rv->pipe_id_ = pipe_id_; |
- rv->non_transferable_state_ = non_transferable_state_; |
- } |
- return rv; |
-} |
+ if (bytes_to_read < bytes_available || |
+ handles_to_read < handles_available) { |
+ no_space = true; |
+ return may_discard; |
+ } |
-MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
- const void* bytes, |
- uint32_t num_bytes, |
- std::vector<DispatcherTransport>* transports, |
- MojoWriteMessageFlags flags) { |
- lock().AssertAcquired(); |
+ return true; |
+ }, |
+ &ports_message); |
- DCHECK(!transports || |
- (transports->size() > 0 && |
- transports->size() <= GetConfiguration().max_message_num_handles)); |
+ if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
+ if (rv == ports::ERROR_PORT_UNKNOWN || |
+ rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
- if (write_error_ || |
- (transferable_ && !channel_) || |
- (!transferable_ && non_transferable_state_ == CLOSED)) { |
- return MOJO_RESULT_FAILED_PRECONDITION; |
+ NOTREACHED(); |
+ return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
} |
- if (num_bytes > GetConfiguration().max_message_num_bytes) |
+ if (no_space) { |
+ // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this |
+ // message's data. The message will still be in queue unless |
+ // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
return MOJO_RESULT_RESOURCE_EXHAUSTED; |
- scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
- if (transports) { |
- MojoResult result = AttachTransportsNoLock(message.get(), transports); |
- if (result != MOJO_RESULT_OK) |
- return result; |
- } |
- |
- message->SerializeAndCloseDispatchers(); |
- if (!transferable_) |
- message->set_route_id(pipe_id_); |
- if (!transferable_ && |
- (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
- non_transferable_state_ == CONNECT_CALLED)) { |
- if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
- RequestNontransferableChannel(); |
- non_transferable_outgoing_message_queue_.AddMessage(std::move(message)); |
- } else { |
- channel_->WriteMessage(std::move(message)); |
} |
- return MOJO_RESULT_OK; |
-} |
+ if (!ports_message) { |
+ // No message was available in queue. |
-MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
- void* bytes, |
- uint32_t* num_bytes, |
- DispatcherVector* dispatchers, |
- uint32_t* num_dispatchers, |
- MojoReadMessageFlags flags) { |
- lock().AssertAcquired(); |
- if (transferable_ && channel_) { |
- channel_->EnsureLazyInitialized(); |
- } else if (!transferable_) { |
- if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
- RequestNontransferableChannel(); |
- return MOJO_RESULT_SHOULD_WAIT; |
- } else if (non_transferable_state_ == CONNECT_CALLED) { |
+ if (rv == ports::OK) |
return MOJO_RESULT_SHOULD_WAIT; |
- } |
- } |
- DCHECK(!dispatchers || dispatchers->empty()); |
- |
- const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
- const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
- |
- if (message_queue_.IsEmpty()) |
- return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; |
- |
- // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
- // and release the lock immediately. |
- bool enough_space = true; |
- MessageInTransit* message = message_queue_.PeekMessage(); |
- if (num_bytes) |
- *num_bytes = message->num_bytes(); |
- if (message->num_bytes() <= max_bytes) |
- memcpy(bytes, message->bytes(), message->num_bytes()); |
- else |
- enough_space = false; |
- |
- if (DispatcherVector* queued_dispatchers = message->dispatchers()) { |
- if (num_dispatchers) |
- *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); |
- if (enough_space) { |
- if (queued_dispatchers->empty()) { |
- // Nothing to do. |
- } else if (queued_dispatchers->size() <= max_num_dispatchers) { |
- DCHECK(dispatchers); |
- dispatchers->swap(*queued_dispatchers); |
- } else { |
- enough_space = false; |
- } |
- } |
- } else { |
- if (num_dispatchers) |
- *num_dispatchers = 0; |
+ // Peer is closed and there are no more messages to read. |
+ DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
+ base::AutoLock lock(signal_lock_); |
+ awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
} |
- message = nullptr; |
- |
- if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
- message_queue_.DiscardMessage(); |
- |
- // Now it's empty, thus no longer readable. |
- if (message_queue_.IsEmpty()) { |
- // It's currently not possible to wait for non-readability, but we should |
- // do the state change anyway. |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ // Alright! We have a message and the caller has provided sufficient storage |
+ // in which to receive it. |
+ |
+ scoped_ptr<PortsMessage> message( |
+ static_cast<PortsMessage*>(ports_message.release())); |
+ |
+ const MessageHeader* header = |
+ static_cast<const MessageHeader*>(message->payload_bytes()); |
+ const DispatcherHeader* dispatcher_headers = |
+ reinterpret_cast<const DispatcherHeader*>(header + 1); |
+ |
+ const char* dispatcher_data = reinterpret_cast<const char*>( |
+ dispatcher_headers + header->num_dispatchers); |
+ |
+ // Deserialize dispatchers. |
+ if (header->num_dispatchers > 0) { |
+ CHECK(handles); |
+ std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); |
+ size_t data_payload_index = sizeof(MessageHeader) + |
+ header->num_dispatchers * sizeof(DispatcherHeader); |
+ size_t port_index = 0; |
+ size_t platform_handle_index = 0; |
+ for (size_t i = 0; i < header->num_dispatchers; ++i) { |
+ const DispatcherHeader& dh = dispatcher_headers[i]; |
+ Type type = static_cast<Type>(dh.type); |
+ |
+ DCHECK_GE(message->num_payload_bytes(), |
+ data_payload_index + dh.num_bytes); |
+ DCHECK_GE(message->num_ports(), |
+ port_index + dh.num_ports); |
+ DCHECK_GE(message->num_handles(), |
+ platform_handle_index + dh.num_platform_handles); |
+ |
+ PlatformHandle* out_handles = |
+ message->num_handles() ? message->handles() + platform_handle_index |
+ : nullptr; |
+ dispatchers[i].dispatcher = Dispatcher::Deserialize( |
+ type, dispatcher_data, dh.num_bytes, message->ports() + port_index, |
+ dh.num_ports, out_handles, dh.num_platform_handles); |
+ if (!dispatchers[i].dispatcher) |
+ return MOJO_RESULT_UNKNOWN; |
+ |
+ dispatcher_data += dh.num_bytes; |
+ data_payload_index += dh.num_bytes; |
+ port_index += dh.num_ports; |
+ platform_handle_index += dh.num_platform_handles; |
} |
+ |
+ if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers, |
+ handles)) |
+ return MOJO_RESULT_UNKNOWN; |
} |
- if (!enough_space) |
- return MOJO_RESULT_RESOURCE_EXHAUSTED; |
+ // Copy message bytes. |
+ DCHECK_GE(message->num_payload_bytes(), header->header_size); |
+ const char* payload = reinterpret_cast<const char*>(message->payload_bytes()); |
+ memcpy(bytes, payload + header->header_size, |
+ message->num_payload_bytes() - header->header_size); |
return MOJO_RESULT_OK; |
} |
-HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
- const { |
- lock().AssertAcquired(); |
- |
- HandleSignalsState rv; |
- if (!message_queue_.IsEmpty()) |
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
- if (!message_queue_.IsEmpty() || |
- (transferable_ && channel_) || |
- (!transferable_ && non_transferable_state_ != CLOSED)) |
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
- if (!write_error_ && |
- ((transferable_ && channel_) || |
- (!transferable_ && non_transferable_state_ != CLOSED))) { |
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
- } |
- if (write_error_ || |
- (transferable_ && !channel_) || |
- (!transferable_ && |
- ((non_transferable_state_ == CLOSED) || is_closed()))) { |
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
- } |
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
- return rv; |
+HandleSignalsState |
+MessagePipeDispatcher::GetHandleSignalsState() const { |
+ base::AutoLock lock(signal_lock_); |
+ return GetHandleSignalsStateNoLock(); |
} |
-MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
+MojoResult MessagePipeDispatcher::AddAwakable( |
Awakable* awakable, |
MojoHandleSignals signals, |
uintptr_t context, |
HandleSignalsState* signals_state) { |
- lock().AssertAcquired(); |
- if (transferable_ && channel_) { |
- channel_->EnsureLazyInitialized(); |
- } else if (!transferable_ && |
- non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
- RequestNontransferableChannel(); |
+ base::AutoLock lock(signal_lock_); |
+ |
+ if (port_closed_ || in_transit_) { |
+ if (signals_state) |
+ *signals_state = HandleSignalsState(); |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
} |
- HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ |
+ DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint " |
+ << endpoint_ << " [awakable=" << awakable << "; port=" |
+ << port_.name() << "; signals=" << signals << "; satisfied=" |
+ << state.satisfied_signals << "; satisfiable=" |
+ << state.satisfiable_signals << "]"; |
+ |
if (state.satisfies(signals)) { |
if (signals_state) |
*signals_state = state; |
+ DVLOG(2) << "Signals already set for " << port_.name(); |
return MOJO_RESULT_ALREADY_EXISTS; |
} |
if (!state.can_satisfy(signals)) { |
if (signals_state) |
*signals_state = state; |
+ DVLOG(2) << "Signals impossible to satisfy for " << port_.name(); |
return MOJO_RESULT_FAILED_PRECONDITION; |
} |
- awakable_list_.Add(awakable, signals, context); |
+ DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint " |
+ << endpoint_ << " [awakable=" << awakable << "; port=" |
+ << port_.name() << "; signals=" << signals << "]"; |
+ |
+ awakables_.Add(awakable, signals, context); |
return MOJO_RESULT_OK; |
} |
-void MessagePipeDispatcher::RemoveAwakableImplNoLock( |
- Awakable* awakable, |
- HandleSignalsState* signals_state) { |
- lock().AssertAcquired(); |
+void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable, |
+ HandleSignalsState* signals_state) { |
+ base::AutoLock lock(signal_lock_); |
+ if (port_closed_ || in_transit_) { |
+ if (signals_state) |
+ *signals_state = HandleSignalsState(); |
+ } else if (signals_state) { |
+ *signals_state = GetHandleSignalsStateNoLock(); |
+ } |
+ |
+ DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint " |
+ << endpoint_ << " [awakable=" << awakable << "; port=" |
+ << port_.name() << "]"; |
- awakable_list_.Remove(awakable); |
- if (signals_state) |
- *signals_state = GetHandleSignalsStateImplNoLock(); |
+ awakables_.Remove(awakable); |
} |
-void MessagePipeDispatcher::StartSerializeImplNoLock( |
- size_t* max_size, |
- size_t* max_platform_handles) { |
- if (!serialized_) |
- SerializeInternal(); |
- |
- *max_platform_handles = 0; |
- if (serialized_platform_handle_.is_valid()) |
- (*max_platform_handles)++; |
- if (!serialized_read_buffer_.empty() || |
- !serialized_write_buffer_.empty() || |
- !serialized_message_queue_.empty()) |
- (*max_platform_handles)++; |
- *max_platform_handles += serialized_fds_.size(); |
- *max_size = sizeof(SerializedMessagePipeHandleDispatcher); |
+void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes, |
+ uint32_t* num_ports, |
+ uint32_t* num_handles) { |
+ *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); |
+ *num_ports = 1; |
+ *num_handles = 0; |
} |
-bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
- void* destination, |
- size_t* actual_size, |
- PlatformHandleVector* platform_handles) { |
- CloseImplNoLock(); |
- SerializedMessagePipeHandleDispatcher* serialization = |
- static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
- serialization->transferable = transferable_; |
- serialization->pipe_id = pipe_id_; |
- if (serialized_platform_handle_.is_valid()) { |
- DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
- serialization->platform_handle_index = |
- static_cast<uint32_t>(platform_handles->size()); |
- platform_handles->push_back(serialized_platform_handle_.release()); |
- } else { |
- serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
- } |
+bool MessagePipeDispatcher::EndSerialize(void* destination, |
+ ports::PortName* ports, |
+ PlatformHandle* handles) { |
+ SerializedState* state = static_cast<SerializedState*>(destination); |
+ state->pipe_id = pipe_id_; |
+ state->endpoint = static_cast<int8_t>(endpoint_); |
+ ports[0] = port_.name(); |
+ return true; |
+} |
- serialization->write_error = write_error_; |
- DCHECK(serialized_read_buffer_.size() < std::numeric_limits<uint32_t>::max()); |
- serialization->serialized_read_buffer_size = |
- static_cast<uint32_t>(serialized_read_buffer_.size()); |
- DCHECK(serialized_write_buffer_.size() < |
- std::numeric_limits<uint32_t>::max()); |
- serialization->serialized_write_buffer_size = |
- static_cast<uint32_t>(serialized_write_buffer_.size()); |
- DCHECK(serialized_message_queue_.size() < |
- std::numeric_limits<uint32_t>::max()); |
- serialization->serialized_message_queue_size = |
- static_cast<uint32_t>(serialized_message_queue_.size()); |
- |
- serialization->shared_memory_size = static_cast<uint32_t>( |
- serialization->serialized_read_buffer_size + |
- serialization->serialized_write_buffer_size + |
- serialization->serialized_message_queue_size); |
- if (serialization->shared_memory_size) { |
- scoped_refptr<PlatformSharedBuffer> shared_buffer( |
- internal::g_platform_support->CreateSharedBuffer( |
- serialization->shared_memory_size)); |
- scoped_ptr<PlatformSharedBufferMapping> mapping( |
- shared_buffer->Map(0, serialization->shared_memory_size)); |
- char* start = static_cast<char*>(mapping->GetBase()); |
- start = SerializeBuffer(start, &serialized_read_buffer_); |
- start = SerializeBuffer(start, &serialized_write_buffer_); |
- start = SerializeBuffer(start, &serialized_message_queue_); |
- |
- DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
- serialization->shared_memory_handle_index = |
- static_cast<uint32_t>(platform_handles->size()); |
- platform_handles->push_back(shared_buffer->PassPlatformHandle().release()); |
- } else { |
- serialization->shared_memory_handle_index = kInvalidMessagePipeHandleIndex; |
- } |
+bool MessagePipeDispatcher::BeginTransit() { |
+ base::AutoLock lock(signal_lock_); |
+ if (in_transit_) |
+ return false; |
+ in_transit_ = true; |
+ return in_transit_; |
+} |
- DCHECK(serialized_read_fds_length_ < std::numeric_limits<uint32_t>::max()); |
- serialization->serialized_read_fds_length = |
- static_cast<uint32_t>(serialized_read_fds_length_); |
- DCHECK(serialized_write_fds_length_ < std::numeric_limits<uint32_t>::max()); |
- serialization->serialized_write_fds_length = |
- static_cast<uint32_t>(serialized_write_fds_length_); |
- DCHECK(serialized_message_fds_length_ < std::numeric_limits<uint32_t>::max()); |
- serialization->serialized_message_fds_length = |
- static_cast<uint32_t>(serialized_message_fds_length_); |
- if (serialized_fds_.empty()) { |
- serialization->serialized_fds_index = kInvalidMessagePipeHandleIndex; |
- } else { |
-#if defined(OS_POSIX) |
- DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
- serialization->serialized_fds_index = |
- static_cast<uint32_t>(platform_handles->size()); |
- for (size_t i = 0; i < serialized_fds_.size(); ++i) |
- platform_handles->push_back(PlatformHandle(serialized_fds_[i])); |
- serialized_fds_.clear(); |
-#endif |
- } |
+void MessagePipeDispatcher::CompleteTransitAndClose() { |
+ node_controller_->SetPortObserver(port_, nullptr); |
- *actual_size = sizeof(SerializedMessagePipeHandleDispatcher); |
- return true; |
+ base::AutoLock lock(signal_lock_); |
+ in_transit_ = false; |
+ port_transferred_ = true; |
+ CloseNoLock(); |
} |
-void MessagePipeDispatcher::TransportStarted() { |
- started_transport_.Acquire(); |
+void MessagePipeDispatcher::CancelTransit() { |
+ base::AutoLock lock(signal_lock_); |
+ in_transit_ = false; |
+ |
+ // Something may have happened while we were waiting for potential transit. |
+ awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
} |
-void MessagePipeDispatcher::TransportEnded() { |
- started_transport_.Release(); |
+// static |
+scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
+ const void* data, |
+ size_t num_bytes, |
+ const ports::PortName* ports, |
+ size_t num_ports, |
+ PlatformHandle* handles, |
+ size_t num_handles) { |
+ if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState)) |
+ return nullptr; |
+ |
+ const SerializedState* state = static_cast<const SerializedState*>(data); |
- base::AutoLock locker(lock()); |
+ ports::PortRef port; |
+ CHECK_EQ( |
+ ports::OK, |
+ internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port)); |
- // If transporting of MPD failed, we might have got more data and didn't |
- // awake for. |
- // TODO(jam): should we care about only alerting if it was empty before |
- // TransportStarted? |
- if (!message_queue_.IsEmpty()) |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, |
+ state->pipe_id, state->endpoint); |
} |
-void MessagePipeDispatcher::OnReadMessage( |
- const MessageInTransit::View& message_view, |
- ScopedPlatformHandleVectorPtr platform_handles) { |
- scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
- if (message_view.transport_data_buffer_size() > 0) { |
- DCHECK(message_view.transport_data_buffer()); |
- message->SetDispatchers(TransportData::DeserializeDispatchers( |
- message_view.transport_data_buffer(), |
- message_view.transport_data_buffer_size(), |
- std::move(platform_handles))); |
- } |
- |
- bool call_release = false; |
- if (started_transport_.Try()) { |
- // We're not in the middle of being sent. |
+MessagePipeDispatcher::~MessagePipeDispatcher() { |
+ DCHECK(port_closed_ && !in_transit_); |
+} |
- // Can get synchronously called back in Init if there was initial data. |
- scoped_ptr<base::AutoLock> locker; |
- if (!calling_init_) |
- locker.reset(new base::AutoLock(lock())); |
+MojoResult MessagePipeDispatcher::CloseNoLock() { |
+ signal_lock_.AssertAcquired(); |
+ if (port_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
- if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
- if (transferable_) { |
- channel_->Shutdown(); |
- } else { |
- internal::g_broker->CloseMessagePipe(pipe_id_, this); |
- non_transferable_state_ = CLOSED; |
- } |
- channel_ = nullptr; |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
- if (close_requested_) { |
- // We requested the other side to close the connection while they also |
- // did the same. We must balance out the AddRef in CloseOnIO to ensure |
- // this object isn't leaked. |
- call_release = true; |
- } |
- } else { |
- bool was_empty = message_queue_.IsEmpty(); |
- message_queue_.AddMessage(std::move(message)); |
- if (was_empty) |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
- } |
+ port_closed_ = true; |
+ awakables_.CancelAll(); |
- started_transport_.Release(); |
- } else { |
- if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
- // We got a request to shutdown the channel but this object is already |
- // calling into channel to serialize it. Since all the other side cares |
- // about is flushing pending messages, we bounce the quit back to it. |
- scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
- channel_->WriteMessage(std::move(message)); |
- } else { |
- // If RawChannel is calling OnRead, that means it has its read_lock_ |
- // acquired. That means StartSerialize can't be accessing message queue as |
- // it waits on ReleaseHandle first which acquires read_lock_. |
- message_queue_.AddMessage(std::move(message)); |
- } |
+ if (!port_transferred_) { |
+ base::AutoUnlock unlock(signal_lock_); |
+ node_controller_->ClosePort(port_); |
} |
- if (call_release) |
- Release(); |
+ return MOJO_RESULT_OK; |
} |
-void MessagePipeDispatcher::OnError(Error error) { |
- // If there's a read error, then the other side of the pipe is closed. By |
- // definition, we can't write since there's no one to read it. And we can't |
- // read anymore, since we just got a read erorr. So we close the pipe. |
- // If there's a write error, then we stop writing. But we keep the pipe open |
- // until we finish reading everything in it. This is because it's valid for |
- // one endpoint to write some data and close their pipe immediately. Even |
- // though the other end can't write anymore, it should still get all the data. |
- switch (error) { |
- case ERROR_READ_SHUTDOWN: |
- // The other side was cleanly closed, so this isn't actually an error. |
- DVLOG(1) << "MessagePipeDispatcher read error (shutdown)"; |
- break; |
- case ERROR_READ_BROKEN: |
- DVLOG(1) << "MessagePipeDispatcher read error (connection broken)"; |
- break; |
- case ERROR_READ_BAD_MESSAGE: |
- // Receiving a bad message means either a bug, data corruption, or |
- // malicious attack (probably due to some other bug). |
- LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)"; |
- break; |
- case ERROR_READ_UNKNOWN: |
- LOG(ERROR) << "MessagePipeDispatcher read error (unknown)"; |
- break; |
- case ERROR_WRITE: |
- // Write errors are slightly notable: they probably shouldn't happen under |
- // normal operation (but maybe the other side crashed). |
- LOG(WARNING) << "MessagePipeDispatcher write error"; |
- DCHECK_EQ(write_error_, false) << "Should only get one write error."; |
- write_error_ = true; |
- break; |
+HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { |
+ HandleSignalsState rv; |
+ |
+ ports::PortStatus port_status; |
+ if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) { |
+ CHECK(in_transit_ || port_transferred_ || port_closed_); |
+ return HandleSignalsState(); |
} |
- bool call_release = false; |
- if (started_transport_.Try()) { |
- base::AutoLock locker(lock()); |
- if (channel_ && error != ERROR_WRITE) { |
- if (transferable_) { |
- channel_->Shutdown(); |
- } else { |
- CHECK_NE(non_transferable_state_, CLOSED); |
- internal::g_broker->CloseMessagePipe(pipe_id_, this); |
- non_transferable_state_ = CLOSED; |
- } |
- channel_ = nullptr; |
- if (close_requested_) { |
- // Balance AddRef in CloseOnIO. |
- call_release = true; |
- } |
- } else if (!channel_ && !transferable_ && |
- non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
- // Balance AddRef in CloseOnIO. |
- call_release = true; |
- } |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
- started_transport_.Release(); |
+ if (port_status.has_messages) { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ } |
+ if (port_status.receiving_messages) |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ if (!port_status.peer_closed) { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
} else { |
- // We must be waiting to call ReleaseHandle. It will call Shutdown. |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
} |
- |
- if (call_release) |
- Release(); |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ return rv; |
} |
-MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
- MessageInTransit* message, |
- std::vector<DispatcherTransport>* transports) { |
- DCHECK(!message->has_dispatchers()); |
- |
- // You're not allowed to send either handle to a message pipe over the message |
- // pipe, so check for this. (The case of trying to write a handle to itself is |
- // taken care of by |Core|. That case kind of makes sense, but leads to |
- // complications if, e.g., both sides try to do the same thing with their |
- // respective handles simultaneously. The other case, of trying to write the |
- // peer handle to a handle, doesn't make sense -- since no handle will be |
- // available to read the message from.) |
- for (size_t i = 0; i < transports->size(); i++) { |
- if (!(*transports)[i].is_valid()) |
- continue; |
- if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { |
- MessagePipeDispatcher* mp = |
- static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); |
- if (transferable_ && mp->transferable_ && |
- channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { |
- // The other case should have been disallowed by |Core|. (Note: |port| |
- // is the peer port of the handle given to |WriteMessage()|.) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- } else if (!transferable_ && !mp->transferable_ && |
- pipe_id_ == mp->pipe_id_) { |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- } |
- } |
- } |
+void MessagePipeDispatcher::OnPortStatusChanged() { |
+ base::AutoLock lock(signal_lock_); |
- // Clone the dispatchers and attach them to the message. (This must be done as |
- // a separate loop, since we want to leave the dispatchers alone on failure.) |
- scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
- dispatchers->reserve(transports->size()); |
- for (size_t i = 0; i < transports->size(); i++) { |
- if ((*transports)[i].is_valid()) { |
- dispatchers->push_back( |
- (*transports)[i].CreateEquivalentDispatcherAndClose()); |
- } else { |
- LOG(WARNING) << "Enqueueing null dispatcher"; |
- dispatchers->push_back(nullptr); |
- } |
- } |
- message->SetDispatchers(std::move(dispatchers)); |
- return MOJO_RESULT_OK; |
-} |
+ // We stop observing our port as soon as it's transferred, but this can race |
+ // with events which are raised right before that happens. This is fine to |
+ // ignore. |
+ if (port_transferred_) |
+ return; |
-void MessagePipeDispatcher::RequestNontransferableChannel() { |
- lock().AssertAcquired(); |
- CHECK(!transferable_); |
- CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
- non_transferable_state_ = CONNECT_CALLED; |
-#if !defined(OFFICIAL_BUILD) |
- non_transferable_bound_stack_.reset(new base::debug::StackTrace); |
+#if !defined(NDEBUG) |
+ ports::PortStatus port_status; |
+ node_controller_->node()->GetStatus(port_, &port_status); |
+ if (port_status.has_messages) { |
+ ports::ScopedMessage unused; |
+ size_t message_size = 0; |
+ node_controller_->node()->GetMessageIf( |
+ port_, [&message_size](const ports::Message& message) { |
+ message_size = message.num_payload_bytes(); |
+ return false; |
+ }, &unused); |
+ DVLOG(1) << "New message detected on message pipe " << pipe_id_ |
+ << " endpoint " << endpoint_ << " [port=" << port_.name() |
+ << "; size=" << message_size << "]"; |
+ } |
+ if (port_status.peer_closed) { |
+ DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ |
+ << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
+ } |
#endif |
- // PostTask since the broker can call us back synchronously. |
- internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, |
- base::Bind(&Broker::ConnectMessagePipe, |
- base::Unretained(internal::g_broker), pipe_id_, |
- base::Unretained(this))); |
+ awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
} |
} // namespace edk |