Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
index eb5892bcecbb777b775c6a8374134b4229f0df07..d55a3c975cea1312c2fd3c6bbbcd8435a1fa72da 100644 |
--- a/mojo/edk/system/message_pipe_dispatcher.cc |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -11,6 +11,7 @@ |
#include "base/memory/ref_counted.h" |
#include "mojo/edk/embedder/embedder_internal.h" |
#include "mojo/edk/system/core.h" |
+#include "mojo/edk/system/message_for_transit.h" |
#include "mojo/edk/system/node_controller.h" |
#include "mojo/edk/system/ports_message.h" |
#include "mojo/edk/system/request_context.h" |
@@ -20,36 +21,10 @@ namespace edk { |
namespace { |
-#pragma pack(push, 1) |
- |
-// Header attached to every message sent over a message pipe. |
-struct MessageHeader { |
- // The number of serialized dispatchers included in this header. |
- uint32_t num_dispatchers; |
- |
- // Total size of the header, including serialized dispatcher data. |
- uint32_t header_size; |
-}; |
- |
-static_assert(sizeof(MessageHeader) % 8 == 0, "Invalid MessageHeader size."); |
- |
-// Header for each dispatcher, immediately following the message header. |
-struct DispatcherHeader { |
- // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. |
- int32_t type; |
- |
- // The size of the serialized dispatcher, not including this header. |
- uint32_t num_bytes; |
+using DispatcherHeader = MessageForTransit::DispatcherHeader; |
+using MessageHeader = MessageForTransit::MessageHeader; |
- // The number of ports needed to deserialize this dispatcher. |
- uint32_t num_ports; |
- |
- // The number of platform handles needed to deserialize this dispatcher. |
- uint32_t num_platform_handles; |
-}; |
- |
-static_assert(sizeof(DispatcherHeader) % 8 == 0, |
- "Invalid DispatcherHeader size."); |
+#pragma pack(push, 1) |
struct SerializedState { |
uint64_t pipe_id; |
@@ -157,163 +132,44 @@ MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
} |
MojoResult MessagePipeDispatcher::WriteMessage( |
- const void* bytes, |
- uint32_t num_bytes, |
- const DispatcherInTransit* dispatchers, |
- uint32_t num_dispatchers, |
+ std::unique_ptr<MessageForTransit> message, |
MojoWriteMessageFlags flags) { |
- |
- |
if (port_closed_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- // A structure for retaining information about every Dispatcher we're about |
- // to send. This information is collected by calling StartSerialize() on |
- // each dispatcher in sequence. |
- struct DispatcherInfo { |
- uint32_t num_bytes; |
- uint32_t num_ports; |
- uint32_t num_handles; |
- }; |
- |
- // This is only the base header size. It will grow as we accumulate the |
- // size of serialized state for each dispatcher. |
- size_t header_size = sizeof(MessageHeader) + |
- num_dispatchers * sizeof(DispatcherHeader); |
- |
- size_t num_ports = 0; |
- size_t num_handles = 0; |
- |
- std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); |
- for (size_t i = 0; i < num_dispatchers; ++i) { |
- Dispatcher* d = dispatchers[i].dispatcher.get(); |
- d->StartSerialize(&dispatcher_info[i].num_bytes, |
- &dispatcher_info[i].num_ports, |
- &dispatcher_info[i].num_handles); |
- header_size += dispatcher_info[i].num_bytes; |
- num_ports += dispatcher_info[i].num_ports; |
- num_handles += dispatcher_info[i].num_handles; |
- } |
- |
- // We now have enough information to fully allocate the message storage. |
- std::unique_ptr<PortsMessage> message = PortsMessage::NewUserMessage( |
- header_size + num_bytes, num_ports, num_handles); |
- DCHECK(message); |
+ size_t num_bytes = message->num_bytes(); |
+ std::unique_ptr<PortsMessage> msg = message->TakePortsMessage(); |
+ int rv = node_controller_->SendMessage(port_, &msg); |
- // Populate the message header with information about serialized dispatchers. |
- // |
- // The front of the message is always a MessageHeader followed by a |
- // DispatcherHeader for each dispatcher to be sent. |
- MessageHeader* header = |
- static_cast<MessageHeader*>(message->mutable_payload_bytes()); |
- DispatcherHeader* dispatcher_headers = |
- reinterpret_cast<DispatcherHeader*>(header + 1); |
- |
- // Serialized dispatcher state immediately follows the series of |
- // DispatcherHeaders. |
- char* dispatcher_data = |
- reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); |
- |
- header->num_dispatchers = num_dispatchers; |
- |
- // |header_size| is the total number of bytes preceding the message payload, |
- // including all dispatcher headers and serialized dispatcher state. |
- DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); |
- header->header_size = static_cast<uint32_t>(header_size); |
- |
- bool cancel_transit = false; |
- if (num_dispatchers > 0) { |
- ScopedPlatformHandleVectorPtr handles( |
- new PlatformHandleVector(num_handles)); |
- size_t port_index = 0; |
- size_t handle_index = 0; |
- for (size_t i = 0; i < num_dispatchers; ++i) { |
- Dispatcher* d = dispatchers[i].dispatcher.get(); |
- DispatcherHeader* dh = &dispatcher_headers[i]; |
- const DispatcherInfo& info = dispatcher_info[i]; |
- |
- // Fill in the header for this dispatcher. |
- dh->type = static_cast<int32_t>(d->GetType()); |
- dh->num_bytes = info.num_bytes; |
- dh->num_ports = info.num_ports; |
- dh->num_platform_handles = info.num_handles; |
- |
- // Fill in serialized state, ports, and platform handles. We'll cancel |
- // the send if the dispatcher implementation rejects for some reason. |
- if (!d->EndSerialize(static_cast<void*>(dispatcher_data), |
- message->mutable_ports() + port_index, |
- handles->data() + handle_index)) { |
- cancel_transit = true; |
- break; |
- } |
- |
- dispatcher_data += info.num_bytes; |
- port_index += info.num_ports; |
- handle_index += info.num_handles; |
- } |
- |
- if (!cancel_transit) { |
- // Take ownership of all the handles and move them into message storage. |
- message->SetHandles(std::move(handles)); |
- } else { |
- // Release any platform handles we've accumulated. Their dispatchers |
- // retain ownership when transit is canceled, so these are not actually |
- // leaking. |
- handles->clear(); |
- } |
- } |
+ DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
+ << " [port=" << port_.name() << "; rv=" << rv |
+ << "; num_bytes=" << num_bytes << "]"; |
- MojoResult result = MOJO_RESULT_OK; |
- if (!cancel_transit) { |
- // Copy the message body. |
- void* message_body = static_cast<void*>( |
- static_cast<char*>(message->mutable_payload_bytes()) + header_size); |
- memcpy(message_body, bytes, num_bytes); |
- |
- int rv = node_controller_->SendMessage(port_, &message); |
- |
- DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
- << " [port=" << port_.name() << "; rv=" << rv |
- << "; num_bytes=" << num_bytes << "]"; |
- |
- if (rv != ports::OK) { |
- if (rv == ports::ERROR_PORT_UNKNOWN || |
- rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
- rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
- result = MOJO_RESULT_INVALID_ARGUMENT; |
- } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
- base::AutoLock lock(signal_lock_); |
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
- result = MOJO_RESULT_FAILED_PRECONDITION; |
- } else { |
- NOTREACHED(); |
- result = MOJO_RESULT_UNKNOWN; |
- } |
- cancel_transit = true; |
- } else { |
- DCHECK(!message); |
+ if (rv != ports::OK) { |
+ if (rv == ports::ERROR_PORT_UNKNOWN || |
+ rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
+ rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
+ base::AutoLock lock(signal_lock_); |
+ awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
} |
- } |
- if (cancel_transit) { |
- // We ended up not sending the message. Release all the platform handles. |
- // Their dipatchers retain ownership when transit is canceled, so these are |
- // not actually leaking. |
- DCHECK(message); |
- Channel::MessagePtr m = message->TakeChannelMessage(); |
- ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); |
- if (handles) |
- handles->clear(); |
+ NOTREACHED(); |
+ return MOJO_RESULT_UNKNOWN; |
} |
- return result; |
+ return MOJO_RESULT_OK; |
} |
-MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
- uint32_t* num_bytes, |
- MojoHandle* handles, |
- uint32_t* num_handles, |
- MojoReadMessageFlags flags) { |
+MojoResult MessagePipeDispatcher::ReadMessage( |
+ std::unique_ptr<MessageForTransit>* message, |
+ uint32_t* num_bytes, |
+ MojoHandle* handles, |
+ uint32_t* num_handles, |
+ MojoReadMessageFlags flags, |
+ bool read_any_size) { |
// We can't read from a port that's closed or in transit! |
if (port_closed_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
@@ -321,15 +177,17 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
bool no_space = false; |
bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
- // Ensure the provided buffers are large enough to hold the next message. |
- // GetMessageIf provides an atomic way to test the next message without |
- // committing to removing it from the port's underlying message queue until |
- // we are sure we can consume it. |
+ // Grab a message if the provided handles buffer is large enough. If the input |
+ // |num_bytes| is provided and |read_any_size| is false, we also ensure |
+ // that it specifies a size at least as large as the next available payload. |
+ // |
+ // If |read_any_size| is true, the input value of |*num_bytes| is ignored. |
+ // This flag exists to support both new and old API behavior. |
ports::ScopedMessage ports_message; |
int rv = node_controller_->node()->GetMessageIf( |
port_, |
- [num_bytes, num_handles, &no_space, &may_discard]( |
+ [read_any_size, num_bytes, num_handles, &no_space, &may_discard]( |
const ports::Message& next_message) { |
const PortsMessage& message = |
static_cast<const PortsMessage&>(next_message); |
@@ -355,8 +213,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
*num_handles = handles_available; |
} |
- if (bytes_to_read < bytes_available || |
- handles_to_read < handles_available) { |
+ if (handles_to_read < handles_available || |
+ (!read_any_size && bytes_to_read < bytes_available)) { |
no_space = true; |
return may_discard; |
} |
@@ -375,9 +233,9 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
} |
if (no_space) { |
- // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this |
- // message's data. The message will still be in queue unless |
- // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
+ // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't |
+ // sufficient to hold this message's data. The message will still be in |
+ // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
return MOJO_RESULT_RESOURCE_EXHAUSTED; |
} |
@@ -397,11 +255,11 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
// Alright! We have a message and the caller has provided sufficient storage |
// in which to receive it. |
- std::unique_ptr<PortsMessage> message( |
+ std::unique_ptr<PortsMessage> msg( |
static_cast<PortsMessage*>(ports_message.release())); |
const MessageHeader* header = |
- static_cast<const MessageHeader*>(message->payload_bytes()); |
+ static_cast<const MessageHeader*>( msg->payload_bytes()); |
const DispatcherHeader* dispatcher_headers = |
reinterpret_cast<const DispatcherHeader*>(header + 1); |
@@ -420,18 +278,17 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
const DispatcherHeader& dh = dispatcher_headers[i]; |
Type type = static_cast<Type>(dh.type); |
- DCHECK_GE(message->num_payload_bytes(), |
+ DCHECK_GE(msg->num_payload_bytes(), |
data_payload_index + dh.num_bytes); |
- DCHECK_GE(message->num_ports(), |
+ DCHECK_GE(msg->num_ports(), |
port_index + dh.num_ports); |
- DCHECK_GE(message->num_handles(), |
+ DCHECK_GE(msg->num_handles(), |
platform_handle_index + dh.num_platform_handles); |
PlatformHandle* out_handles = |
- message->num_handles() ? message->handles() + platform_handle_index |
- : nullptr; |
+ msg->num_handles() ? msg->handles() + platform_handle_index : nullptr; |
dispatchers[i].dispatcher = Dispatcher::Deserialize( |
- type, dispatcher_data, dh.num_bytes, message->ports() + port_index, |
+ type, dispatcher_data, dh.num_bytes, msg->ports() + port_index, |
dh.num_ports, out_handles, dh.num_platform_handles); |
if (!dispatchers[i].dispatcher) |
return MOJO_RESULT_UNKNOWN; |
@@ -447,12 +304,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
return MOJO_RESULT_UNKNOWN; |
} |
- // Copy message bytes. |
- DCHECK_GE(message->num_payload_bytes(), header->header_size); |
- const char* payload = reinterpret_cast<const char*>(message->payload_bytes()); |
- memcpy(bytes, payload + header->header_size, |
- message->num_payload_bytes() - header->header_size); |
- |
+ CHECK(msg); |
+ *message = MessageForTransit::WrapPortsMessage(std::move(msg)); |
return MOJO_RESULT_OK; |
} |