Chromium Code Reviews| Index: mojo/edk/system/message_pipe_dispatcher.cc |
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
| index 187176d8d481e2111ea4edb861a5d6155d82242a..f41668f64b1224f21be990663fcf8281ca4949cf 100644 |
| --- a/mojo/edk/system/message_pipe_dispatcher.cc |
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc |
| @@ -11,6 +11,7 @@ |
| #include "base/memory/scoped_ptr.h" |
| #include "mojo/edk/embedder/embedder_internal.h" |
| #include "mojo/edk/system/core.h" |
| +#include "mojo/edk/system/message_for_transit.h" |
| #include "mojo/edk/system/node_controller.h" |
| #include "mojo/edk/system/ports_message.h" |
| #include "mojo/edk/system/request_context.h" |
| @@ -22,35 +23,6 @@ namespace { |
| #pragma pack(push, 1) |
| -// Header attached to every message sent over a message pipe. |
| -struct MessageHeader { |
| - // The number of serialized dispatchers included in this header. |
| - uint32_t num_dispatchers; |
| - |
| - // Total size of the header, including serialized dispatcher data. |
| - uint32_t header_size; |
| -}; |
| - |
| -static_assert(sizeof(MessageHeader) % 8 == 0, "Invalid MessageHeader size."); |
| - |
| -// Header for each dispatcher, immediately following the message header. |
| -struct DispatcherHeader { |
| - // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. |
| - int32_t type; |
| - |
| - // The size of the serialized dispatcher, not including this header. |
| - uint32_t num_bytes; |
| - |
| - // The number of ports needed to deserialize this dispatcher. |
| - uint32_t num_ports; |
| - |
| - // The number of platform handles needed to deserialize this dispatcher. |
| - uint32_t num_platform_handles; |
| -}; |
| - |
| -static_assert(sizeof(DispatcherHeader) % 8 == 0, |
| - "Invalid DispatcherHeader size."); |
| - |
| struct SerializedState { |
| uint64_t pipe_id; |
| int8_t endpoint; |
| @@ -157,10 +129,7 @@ MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
| } |
| MojoResult MessagePipeDispatcher::WriteMessage( |
| - const void* bytes, |
| - uint32_t num_bytes, |
| - const DispatcherInTransit* dispatchers, |
| - uint32_t num_dispatchers, |
| + std::unique_ptr<MessageForTransit> message, |
| MojoWriteMessageFlags flags) { |
| { |
| @@ -169,153 +138,39 @@ MojoResult MessagePipeDispatcher::WriteMessage( |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| } |
| - // A structure for retaining information about every Dispatcher we're about |
| - // to send. This information is collected by calling StartSerialize() on |
| - // each dispatcher in sequence. |
| - struct DispatcherInfo { |
| - uint32_t num_bytes; |
| - uint32_t num_ports; |
| - uint32_t num_handles; |
| - }; |
| - |
| - // This is only the base header size. It will grow as we accumulate the |
| - // size of serialized state for each dispatcher. |
| - size_t header_size = sizeof(MessageHeader) + |
| - num_dispatchers * sizeof(DispatcherHeader); |
| - |
| - size_t num_ports = 0; |
| - size_t num_handles = 0; |
| - |
| - std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); |
| - for (size_t i = 0; i < num_dispatchers; ++i) { |
| - Dispatcher* d = dispatchers[i].dispatcher.get(); |
| - d->StartSerialize(&dispatcher_info[i].num_bytes, |
| - &dispatcher_info[i].num_ports, |
| - &dispatcher_info[i].num_handles); |
| - header_size += dispatcher_info[i].num_bytes; |
| - num_ports += dispatcher_info[i].num_ports; |
| - num_handles += dispatcher_info[i].num_handles; |
| - } |
| + size_t num_bytes = message->num_bytes(); |
| + int rv = node_controller_->SendMessage(port_, message->message()); |
| - // We now have enough information to fully allocate the message storage. |
| - scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage( |
| - header_size + num_bytes, num_ports, num_handles); |
| - DCHECK(message); |
| - |
| - // Populate the message header with information about serialized dispatchers. |
| - // |
| - // The front of the message is always a MessageHeader followed by a |
| - // DispatcherHeader for each dispatcher to be sent. |
| - MessageHeader* header = |
| - static_cast<MessageHeader*>(message->mutable_payload_bytes()); |
| - DispatcherHeader* dispatcher_headers = |
| - reinterpret_cast<DispatcherHeader*>(header + 1); |
| - |
| - // Serialized dispatcher state immediately follows the series of |
| - // DispatcherHeaders. |
| - char* dispatcher_data = |
| - reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); |
| - |
| - header->num_dispatchers = num_dispatchers; |
| - |
| - // |header_size| is the total number of bytes preceding the message payload, |
| - // including all dispatcher headers and serialized dispatcher state. |
| - DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); |
| - header->header_size = static_cast<uint32_t>(header_size); |
| - |
| - bool cancel_transit = false; |
| - if (num_dispatchers > 0) { |
| - ScopedPlatformHandleVectorPtr handles( |
| - new PlatformHandleVector(num_handles)); |
| - size_t port_index = 0; |
| - size_t handle_index = 0; |
| - for (size_t i = 0; i < num_dispatchers; ++i) { |
| - Dispatcher* d = dispatchers[i].dispatcher.get(); |
| - DispatcherHeader* dh = &dispatcher_headers[i]; |
| - const DispatcherInfo& info = dispatcher_info[i]; |
| - |
| - // Fill in the header for this dispatcher. |
| - dh->type = static_cast<int32_t>(d->GetType()); |
| - dh->num_bytes = info.num_bytes; |
| - dh->num_ports = info.num_ports; |
| - dh->num_platform_handles = info.num_handles; |
| - |
| - // Fill in serialized state, ports, and platform handles. We'll cancel |
| - // the send if the dispatcher implementation rejects for some reason. |
| - if (!d->EndSerialize(static_cast<void*>(dispatcher_data), |
| - message->mutable_ports() + port_index, |
| - handles->data() + handle_index)) { |
| - cancel_transit = true; |
| - break; |
| - } |
| - |
| - dispatcher_data += info.num_bytes; |
| - port_index += info.num_ports; |
| - handle_index += info.num_handles; |
| - } |
| + DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
| + << " [port=" << port_.name() << "; rv=" << rv |
| + << "; num_bytes=" << num_bytes << "]"; |
| - if (!cancel_transit) { |
| - // Take ownership of all the handles and move them into message storage. |
| - message->SetHandles(std::move(handles)); |
| - } else { |
| - // Release any platform handles we've accumulated. Their dispatchers |
| - // retain ownership when transit is canceled, so these are not actually |
| - // leaking. |
| - handles->clear(); |
| + if (rv != ports::OK) { |
| + if (rv == ports::ERROR_PORT_UNKNOWN || |
| + rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
| + rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
| + return MOJO_RESULT_INVALID_ARGUMENT; |
| + } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
| + base::AutoLock lock(signal_lock_); |
| + awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| + return MOJO_RESULT_FAILED_PRECONDITION; |
| } |
| - } |
| - MojoResult result = MOJO_RESULT_OK; |
| - if (!cancel_transit) { |
| - // Copy the message body. |
| - void* message_body = static_cast<void*>( |
| - static_cast<char*>(message->mutable_payload_bytes()) + header_size); |
| - memcpy(message_body, bytes, num_bytes); |
| - |
| - int rv = node_controller_->SendMessage(port_, &message); |
| - |
| - DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
| - << " [port=" << port_.name() << "; rv=" << rv |
| - << "; num_bytes=" << num_bytes << "]"; |
| - |
| - if (rv != ports::OK) { |
| - if (rv == ports::ERROR_PORT_UNKNOWN || |
| - rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
| - rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
| - result = MOJO_RESULT_INVALID_ARGUMENT; |
| - } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
| - base::AutoLock lock(signal_lock_); |
| - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| - result = MOJO_RESULT_FAILED_PRECONDITION; |
| - } else { |
| - NOTREACHED(); |
| - result = MOJO_RESULT_UNKNOWN; |
| - } |
| - cancel_transit = true; |
| - } else { |
| - DCHECK(!message); |
| - } |
| - } |
| - |
| - if (cancel_transit) { |
| - // We ended up not sending the message. Release all the platform handles. |
| - // Their dipatchers retain ownership when transit is canceled, so these are |
| - // not actually leaking. |
| - DCHECK(message); |
| - Channel::MessagePtr m = message->TakeChannelMessage(); |
| - ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); |
| - if (handles) |
| - handles->clear(); |
| + NOTREACHED(); |
| + return MOJO_RESULT_UNKNOWN; |
| } |
| - return result; |
| + return MOJO_RESULT_OK; |
| } |
| -MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| - uint32_t* num_bytes, |
| - MojoHandle* handles, |
| - uint32_t* num_handles, |
| - MojoReadMessageFlags flags) { |
| +MojoResult MessagePipeDispatcher::ReadMessage( |
| + std::unique_ptr<MessageForTransit>* message, |
| + uint32_t* num_bytes, |
| + MojoHandle* handles, |
| + uint32_t* num_handles, |
| + MojoReadMessageFlags flags, |
| + bool ignore_num_bytes) { |
| + |
| { |
| base::AutoLock lock(signal_lock_); |
| // We can't read from a port that's closed or in transit! |
| @@ -326,22 +181,23 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| bool no_space = false; |
| bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
| - // Ensure the provided buffers are large enough to hold the next message. |
| - // GetMessageIf provides an atomic way to test the next message without |
| - // committing to removing it from the port's underlying message queue until |
| - // we are sure we can consume it. |
| + // Grab a message if the provided handles buffer is large enough. If the input |
| + // |num_bytes| is provided and |ignore_num_bytes| is false, we also ensure |
| + // that it specifies a size at least as large as the next available payload. |
| ports::ScopedMessage ports_message; |
| int rv = node_controller_->node()->GetMessageIf( |
| port_, |
| - [num_bytes, num_handles, &no_space, &may_discard]( |
| + [num_handles, num_bytes, &no_space, &may_discard, &ignore_num_bytes]( |
|
Anand Mistry (off Chromium)
2016/04/19 12:57:35
why did you swap bytes and handles? And why is ign
Ken Rockot(use gerrit already)
2016/04/19 18:11:25
Didn't notice I had done the swap - probably from
|
| const ports::Message& next_message) { |
| const PortsMessage& message = |
| static_cast<const PortsMessage&>(next_message); |
| - DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader)); |
| + DCHECK_GE(message.num_payload_bytes(), |
| + sizeof(MessageForTransit::MessageHeader)); |
|
Anand Mistry (off Chromium)
2016/04/19 12:57:36
You could stick a "using MessageForTransit::Messag
Ken Rockot(use gerrit already)
2016/04/19 18:11:25
Done
|
| - const MessageHeader* header = |
| - static_cast<const MessageHeader*>(message.payload_bytes()); |
| + const MessageForTransit::MessageHeader* header = |
| + static_cast<const MessageForTransit::MessageHeader*>( |
| + message.payload_bytes()); |
| DCHECK_LE(header->header_size, message.num_payload_bytes()); |
| uint32_t bytes_to_read = 0; |
| @@ -360,8 +216,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| *num_handles = handles_available; |
| } |
| - if (bytes_to_read < bytes_available || |
| - handles_to_read < handles_available) { |
| + if (handles_to_read < handles_available || |
| + (!ignore_num_bytes && bytes_to_read < bytes_available)) { |
| no_space = true; |
| return may_discard; |
| } |
| @@ -380,9 +236,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| } |
| if (no_space) { |
| - // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this |
| - // message's data. The message will still be in queue unless |
| - // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
| + // |*num_handles| wasn't sufficient to hold this message's data. The message |
| + // will still be in queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
| return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| } |
| @@ -402,13 +257,14 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| // Alright! We have a message and the caller has provided sufficient storage |
| // in which to receive it. |
| - scoped_ptr<PortsMessage> message( |
| + scoped_ptr<PortsMessage> msg( |
| static_cast<PortsMessage*>(ports_message.release())); |
| - const MessageHeader* header = |
| - static_cast<const MessageHeader*>(message->payload_bytes()); |
| - const DispatcherHeader* dispatcher_headers = |
| - reinterpret_cast<const DispatcherHeader*>(header + 1); |
| + const MessageForTransit::MessageHeader* header = |
| + static_cast<const MessageForTransit::MessageHeader*>( |
| + msg->payload_bytes()); |
| + const MessageForTransit::DispatcherHeader* dispatcher_headers = |
| + reinterpret_cast<const MessageForTransit::DispatcherHeader*>(header + 1); |
| const char* dispatcher_data = reinterpret_cast<const char*>( |
| dispatcher_headers + header->num_dispatchers); |
| @@ -417,26 +273,25 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| if (header->num_dispatchers > 0) { |
| CHECK(handles); |
| std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers); |
| - size_t data_payload_index = sizeof(MessageHeader) + |
| - header->num_dispatchers * sizeof(DispatcherHeader); |
| + size_t data_payload_index = sizeof(MessageForTransit::MessageHeader) + |
| + header->num_dispatchers * sizeof(MessageForTransit::DispatcherHeader); |
| size_t port_index = 0; |
| size_t platform_handle_index = 0; |
| for (size_t i = 0; i < header->num_dispatchers; ++i) { |
| - const DispatcherHeader& dh = dispatcher_headers[i]; |
| + const MessageForTransit::DispatcherHeader& dh = dispatcher_headers[i]; |
| Type type = static_cast<Type>(dh.type); |
| - DCHECK_GE(message->num_payload_bytes(), |
| + DCHECK_GE(msg->num_payload_bytes(), |
| data_payload_index + dh.num_bytes); |
| - DCHECK_GE(message->num_ports(), |
| + DCHECK_GE(msg->num_ports(), |
| port_index + dh.num_ports); |
| - DCHECK_GE(message->num_handles(), |
| + DCHECK_GE(msg->num_handles(), |
| platform_handle_index + dh.num_platform_handles); |
| PlatformHandle* out_handles = |
| - message->num_handles() ? message->handles() + platform_handle_index |
| - : nullptr; |
| + msg->num_handles() ? msg->handles() + platform_handle_index : nullptr; |
| dispatchers[i].dispatcher = Dispatcher::Deserialize( |
| - type, dispatcher_data, dh.num_bytes, message->ports() + port_index, |
| + type, dispatcher_data, dh.num_bytes, msg->ports() + port_index, |
| dh.num_ports, out_handles, dh.num_platform_handles); |
| if (!dispatchers[i].dispatcher) |
| return MOJO_RESULT_UNKNOWN; |
| @@ -452,12 +307,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| return MOJO_RESULT_UNKNOWN; |
| } |
| - // Copy message bytes. |
| - DCHECK_GE(message->num_payload_bytes(), header->header_size); |
| - const char* payload = reinterpret_cast<const char*>(message->payload_bytes()); |
| - memcpy(bytes, payload + header->header_size, |
| - message->num_payload_bytes() - header->header_size); |
| - |
| + CHECK(msg); |
| + message->reset(MessageForTransit::WrapPortsMessage(std::move(msg))); |
| return MOJO_RESULT_OK; |
| } |