| Index: mojo/edk/system/message_pipe_dispatcher.cc
|
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
|
| index b90f3b795f7ced23dd4141bb187c4180b21abcac..fec45e02ddc34029cc499a454e9113ac61e069b4 100644
|
| --- a/mojo/edk/system/message_pipe_dispatcher.cc
|
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc
|
| @@ -11,6 +11,7 @@
|
| #include "base/memory/scoped_ptr.h"
|
| #include "mojo/edk/embedder/embedder_internal.h"
|
| #include "mojo/edk/system/core.h"
|
| +#include "mojo/edk/system/message_for_transit.h"
|
| #include "mojo/edk/system/node_controller.h"
|
| #include "mojo/edk/system/ports_message.h"
|
| #include "mojo/edk/system/request_context.h"
|
| @@ -20,36 +21,10 @@ namespace edk {
|
|
|
| namespace {
|
|
|
| -#pragma pack(push, 1)
|
| -
|
| -// Header attached to every message sent over a message pipe.
|
| -struct MessageHeader {
|
| - // The number of serialized dispatchers included in this header.
|
| - uint32_t num_dispatchers;
|
| -
|
| - // Total size of the header, including serialized dispatcher data.
|
| - uint32_t header_size;
|
| -};
|
| -
|
| -static_assert(sizeof(MessageHeader) % 8 == 0, "Invalid MessageHeader size.");
|
| -
|
| -// Header for each dispatcher, immediately following the message header.
|
| -struct DispatcherHeader {
|
| - // The type of the dispatcher, correpsonding to the Dispatcher::Type enum.
|
| - int32_t type;
|
| -
|
| - // The size of the serialized dispatcher, not including this header.
|
| - uint32_t num_bytes;
|
| +using DispatcherHeader = MessageForTransit::DispatcherHeader;
|
| +using MessageHeader = MessageForTransit::MessageHeader;
|
|
|
| - // The number of ports needed to deserialize this dispatcher.
|
| - uint32_t num_ports;
|
| -
|
| - // The number of platform handles needed to deserialize this dispatcher.
|
| - uint32_t num_platform_handles;
|
| -};
|
| -
|
| -static_assert(sizeof(DispatcherHeader) % 8 == 0,
|
| - "Invalid DispatcherHeader size.");
|
| +#pragma pack(push, 1)
|
|
|
| struct SerializedState {
|
| uint64_t pipe_id;
|
| @@ -157,163 +132,46 @@ MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
|
| }
|
|
|
| MojoResult MessagePipeDispatcher::WriteMessage(
|
| - const void* bytes,
|
| - uint32_t num_bytes,
|
| - const DispatcherInTransit* dispatchers,
|
| - uint32_t num_dispatchers,
|
| + std::unique_ptr<MessageForTransit> message,
|
| MojoWriteMessageFlags flags) {
|
|
|
|
|
| if (port_closed_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - // A structure for retaining information about every Dispatcher we're about
|
| - // to send. This information is collected by calling StartSerialize() on
|
| - // each dispatcher in sequence.
|
| - struct DispatcherInfo {
|
| - uint32_t num_bytes;
|
| - uint32_t num_ports;
|
| - uint32_t num_handles;
|
| - };
|
| -
|
| - // This is only the base header size. It will grow as we accumulate the
|
| - // size of serialized state for each dispatcher.
|
| - size_t header_size = sizeof(MessageHeader) +
|
| - num_dispatchers * sizeof(DispatcherHeader);
|
| -
|
| - size_t num_ports = 0;
|
| - size_t num_handles = 0;
|
| -
|
| - std::vector<DispatcherInfo> dispatcher_info(num_dispatchers);
|
| - for (size_t i = 0; i < num_dispatchers; ++i) {
|
| - Dispatcher* d = dispatchers[i].dispatcher.get();
|
| - d->StartSerialize(&dispatcher_info[i].num_bytes,
|
| - &dispatcher_info[i].num_ports,
|
| - &dispatcher_info[i].num_handles);
|
| - header_size += dispatcher_info[i].num_bytes;
|
| - num_ports += dispatcher_info[i].num_ports;
|
| - num_handles += dispatcher_info[i].num_handles;
|
| - }
|
| -
|
| - // We now have enough information to fully allocate the message storage.
|
| - scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage(
|
| - header_size + num_bytes, num_ports, num_handles);
|
| - DCHECK(message);
|
| + size_t num_bytes = message->num_bytes();
|
| + std::unique_ptr<PortsMessage> msg = message->TakePortsMessage();
|
| + int rv = node_controller_->SendMessage(port_, &msg);
|
|
|
| - // Populate the message header with information about serialized dispatchers.
|
| - //
|
| - // The front of the message is always a MessageHeader followed by a
|
| - // DispatcherHeader for each dispatcher to be sent.
|
| - MessageHeader* header =
|
| - static_cast<MessageHeader*>(message->mutable_payload_bytes());
|
| - DispatcherHeader* dispatcher_headers =
|
| - reinterpret_cast<DispatcherHeader*>(header + 1);
|
| -
|
| - // Serialized dispatcher state immediately follows the series of
|
| - // DispatcherHeaders.
|
| - char* dispatcher_data =
|
| - reinterpret_cast<char*>(dispatcher_headers + num_dispatchers);
|
| -
|
| - header->num_dispatchers = num_dispatchers;
|
| -
|
| - // |header_size| is the total number of bytes preceding the message payload,
|
| - // including all dispatcher headers and serialized dispatcher state.
|
| - DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max());
|
| - header->header_size = static_cast<uint32_t>(header_size);
|
| -
|
| - bool cancel_transit = false;
|
| - if (num_dispatchers > 0) {
|
| - ScopedPlatformHandleVectorPtr handles(
|
| - new PlatformHandleVector(num_handles));
|
| - size_t port_index = 0;
|
| - size_t handle_index = 0;
|
| - for (size_t i = 0; i < num_dispatchers; ++i) {
|
| - Dispatcher* d = dispatchers[i].dispatcher.get();
|
| - DispatcherHeader* dh = &dispatcher_headers[i];
|
| - const DispatcherInfo& info = dispatcher_info[i];
|
| -
|
| - // Fill in the header for this dispatcher.
|
| - dh->type = static_cast<int32_t>(d->GetType());
|
| - dh->num_bytes = info.num_bytes;
|
| - dh->num_ports = info.num_ports;
|
| - dh->num_platform_handles = info.num_handles;
|
| -
|
| - // Fill in serialized state, ports, and platform handles. We'll cancel
|
| - // the send if the dispatcher implementation rejects for some reason.
|
| - if (!d->EndSerialize(static_cast<void*>(dispatcher_data),
|
| - message->mutable_ports() + port_index,
|
| - handles->data() + handle_index)) {
|
| - cancel_transit = true;
|
| - break;
|
| - }
|
| -
|
| - dispatcher_data += info.num_bytes;
|
| - port_index += info.num_ports;
|
| - handle_index += info.num_handles;
|
| - }
|
| + DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
|
| + << " [port=" << port_.name() << "; rv=" << rv
|
| + << "; num_bytes=" << num_bytes << "]";
|
|
|
| - if (!cancel_transit) {
|
| - // Take ownership of all the handles and move them into message storage.
|
| - message->SetHandles(std::move(handles));
|
| - } else {
|
| - // Release any platform handles we've accumulated. Their dispatchers
|
| - // retain ownership when transit is canceled, so these are not actually
|
| - // leaking.
|
| - handles->clear();
|
| - }
|
| - }
|
| -
|
| - MojoResult result = MOJO_RESULT_OK;
|
| - if (!cancel_transit) {
|
| - // Copy the message body.
|
| - void* message_body = static_cast<void*>(
|
| - static_cast<char*>(message->mutable_payload_bytes()) + header_size);
|
| - memcpy(message_body, bytes, num_bytes);
|
| -
|
| - int rv = node_controller_->SendMessage(port_, &message);
|
| -
|
| - DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
|
| - << " [port=" << port_.name() << "; rv=" << rv
|
| - << "; num_bytes=" << num_bytes << "]";
|
| -
|
| - if (rv != ports::OK) {
|
| - if (rv == ports::ERROR_PORT_UNKNOWN ||
|
| - rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
|
| - rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
|
| - result = MOJO_RESULT_INVALID_ARGUMENT;
|
| - } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
|
| - base::AutoLock lock(signal_lock_);
|
| - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| - result = MOJO_RESULT_FAILED_PRECONDITION;
|
| - } else {
|
| - NOTREACHED();
|
| - result = MOJO_RESULT_UNKNOWN;
|
| - }
|
| - cancel_transit = true;
|
| - } else {
|
| - DCHECK(!message);
|
| + if (rv != ports::OK) {
|
| + if (rv == ports::ERROR_PORT_UNKNOWN ||
|
| + rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
|
| + rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
|
| + base::AutoLock lock(signal_lock_);
|
| + awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| }
|
| - }
|
|
|
| - if (cancel_transit) {
|
| - // We ended up not sending the message. Release all the platform handles.
|
| - // Their dipatchers retain ownership when transit is canceled, so these are
|
| - // not actually leaking.
|
| - DCHECK(message);
|
| - Channel::MessagePtr m = message->TakeChannelMessage();
|
| - ScopedPlatformHandleVectorPtr handles = m->TakeHandles();
|
| - if (handles)
|
| - handles->clear();
|
| + NOTREACHED();
|
| + return MOJO_RESULT_UNKNOWN;
|
| }
|
|
|
| - return result;
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| -MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| - uint32_t* num_bytes,
|
| - MojoHandle* handles,
|
| - uint32_t* num_handles,
|
| - MojoReadMessageFlags flags) {
|
| +MojoResult MessagePipeDispatcher::ReadMessage(
|
| + std::unique_ptr<MessageForTransit>* message,
|
| + uint32_t* num_bytes,
|
| + MojoHandle* handles,
|
| + uint32_t* num_handles,
|
| + MojoReadMessageFlags flags,
|
| + bool read_any_size) {
|
| // We can't read from a port that's closed or in transit!
|
| if (port_closed_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| @@ -321,15 +179,17 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| bool no_space = false;
|
| bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
|
|
|
| - // Ensure the provided buffers are large enough to hold the next message.
|
| - // GetMessageIf provides an atomic way to test the next message without
|
| - // committing to removing it from the port's underlying message queue until
|
| - // we are sure we can consume it.
|
| + // Grab a message if the provided handles buffer is large enough. If the input
|
| + // |num_bytes| is provided and |read_any_size| is false, we also ensure
|
| + // that it specifies a size at least as large as the next available payload.
|
| + //
|
| + // If |read_any_size| is true, the input value of |*num_bytes| is ignored.
|
| + // This flag exists to support both new and old API behavior.
|
|
|
| ports::ScopedMessage ports_message;
|
| int rv = node_controller_->node()->GetMessageIf(
|
| port_,
|
| - [num_bytes, num_handles, &no_space, &may_discard](
|
| + [read_any_size, num_bytes, num_handles, &no_space, &may_discard](
|
| const ports::Message& next_message) {
|
| const PortsMessage& message =
|
| static_cast<const PortsMessage&>(next_message);
|
| @@ -355,8 +215,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| *num_handles = handles_available;
|
| }
|
|
|
| - if (bytes_to_read < bytes_available ||
|
| - handles_to_read < handles_available) {
|
| + if (handles_to_read < handles_available ||
|
| + (!read_any_size && bytes_to_read < bytes_available)) {
|
| no_space = true;
|
| return may_discard;
|
| }
|
| @@ -375,9 +235,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| }
|
|
|
| if (no_space) {
|
| - // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this
|
| - // message's data. The message will still be in queue unless
|
| - // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
|
| + // |*num_handles| wasn't sufficient to hold this message's data. The message
|
| + // will still be in queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
|
| return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| }
|
|
|
| @@ -397,11 +256,11 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| // Alright! We have a message and the caller has provided sufficient storage
|
| // in which to receive it.
|
|
|
| - scoped_ptr<PortsMessage> message(
|
| + scoped_ptr<PortsMessage> msg(
|
| static_cast<PortsMessage*>(ports_message.release()));
|
|
|
| const MessageHeader* header =
|
| - static_cast<const MessageHeader*>(message->payload_bytes());
|
| + static_cast<const MessageHeader*>( msg->payload_bytes());
|
| const DispatcherHeader* dispatcher_headers =
|
| reinterpret_cast<const DispatcherHeader*>(header + 1);
|
|
|
| @@ -420,18 +279,17 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| const DispatcherHeader& dh = dispatcher_headers[i];
|
| Type type = static_cast<Type>(dh.type);
|
|
|
| - DCHECK_GE(message->num_payload_bytes(),
|
| + DCHECK_GE(msg->num_payload_bytes(),
|
| data_payload_index + dh.num_bytes);
|
| - DCHECK_GE(message->num_ports(),
|
| + DCHECK_GE(msg->num_ports(),
|
| port_index + dh.num_ports);
|
| - DCHECK_GE(message->num_handles(),
|
| + DCHECK_GE(msg->num_handles(),
|
| platform_handle_index + dh.num_platform_handles);
|
|
|
| PlatformHandle* out_handles =
|
| - message->num_handles() ? message->handles() + platform_handle_index
|
| - : nullptr;
|
| + msg->num_handles() ? msg->handles() + platform_handle_index : nullptr;
|
| dispatchers[i].dispatcher = Dispatcher::Deserialize(
|
| - type, dispatcher_data, dh.num_bytes, message->ports() + port_index,
|
| + type, dispatcher_data, dh.num_bytes, msg->ports() + port_index,
|
| dh.num_ports, out_handles, dh.num_platform_handles);
|
| if (!dispatchers[i].dispatcher)
|
| return MOJO_RESULT_UNKNOWN;
|
| @@ -447,12 +305,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| return MOJO_RESULT_UNKNOWN;
|
| }
|
|
|
| - // Copy message bytes.
|
| - DCHECK_GE(message->num_payload_bytes(), header->header_size);
|
| - const char* payload = reinterpret_cast<const char*>(message->payload_bytes());
|
| - memcpy(bytes, payload + header->header_size,
|
| - message->num_payload_bytes() - header->header_size);
|
| -
|
| + CHECK(msg);
|
| + message->reset(MessageForTransit::WrapPortsMessage(std::move(msg)));
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
|
|